diff --git a/.gitignore b/.gitignore index 3679c8f0c..441ef20e1 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ nbproject/ .serena/ .bob/ claudedocs +backlog/ diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java index 086e640e3..3a704ee2a 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java @@ -13,6 +13,7 @@ import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; +import jakarta.persistence.PersistenceException; import jakarta.persistence.TypedQuery; import jakarta.transaction.Transactional; @@ -22,6 +23,8 @@ import io.a2a.server.config.A2AConfigProvider; import io.a2a.server.tasks.TaskStateProvider; import io.a2a.server.tasks.TaskStore; +import io.a2a.server.tasks.TaskSerializationException; +import io.a2a.server.tasks.TaskPersistenceException; import io.a2a.spec.Artifact; import io.a2a.spec.ListTasksParams; import io.a2a.spec.Message; @@ -63,6 +66,7 @@ void initConfig() { gracePeriodSeconds = Long.parseLong(configProvider.getValue(A2A_REPLICATION_GRACE_PERIOD_SECONDS)); } + @Transactional @Override public void save(Task task, boolean isReplicated) { @@ -85,7 +89,12 @@ public void save(Task task, boolean isReplicated) { } } catch (JsonProcessingException e) { LOGGER.error("Failed to serialize task with ID: {}", task.id(), e); - throw new RuntimeException("Failed to serialize task with ID: " + task.id(), e); + throw new TaskSerializationException(task.id(), + "Failed to serialize task for persistence", e); + } catch (PersistenceException e) { + LOGGER.error("Database save failed for task with ID: {}", task.id(), e); + throw new TaskPersistenceException(task.id(), + "Database save failed for task", e); } } @@ -93,19 +102,27 @@ public void save(Task task, boolean isReplicated) { @Override public Task get(String taskId) { LOGGER.debug("Retrieving task with ID: {}", taskId); - JpaTask jpaTask = em.find(JpaTask.class, taskId); - if (jpaTask == null) { - LOGGER.debug("Task not found with ID: {}", taskId); - return null; - } - try { - Task task = jpaTask.getTask(); - LOGGER.debug("Successfully retrieved task with ID: {}", taskId); - return task; - } catch (JsonProcessingException e) { - LOGGER.error("Failed to deserialize task with ID: {}", taskId, e); - throw new RuntimeException("Failed to deserialize task with ID: " + taskId, e); + JpaTask jpaTask = em.find(JpaTask.class, taskId); + if (jpaTask == null) { + LOGGER.debug("Task not found with ID: {}", taskId); + return null; + } + + try { + Task task = jpaTask.getTask(); + LOGGER.debug("Successfully retrieved task with ID: {}", taskId); + return task; + } catch (JsonProcessingException e) { + LOGGER.error("Failed to deserialize task with ID: {}", taskId, e); + throw new TaskSerializationException(taskId, + "Failed to deserialize task from database", e); + } + + } catch (PersistenceException e) { + LOGGER.error("Database retrieval failed for task with ID: {}", taskId, e); + throw new TaskPersistenceException(taskId, + "Database retrieval failed for task", e); } } @@ -113,12 +130,18 @@ public Task get(String taskId) { @Override public void delete(String taskId) { LOGGER.debug("Deleting task with ID: {}", taskId); - JpaTask jpaTask = em.find(JpaTask.class, taskId); - if (jpaTask != null) { - em.remove(jpaTask); - LOGGER.debug("Successfully deleted task with ID: {}", taskId); - } else { - LOGGER.debug("Task not found for deletion with ID: {}", taskId); + try { + JpaTask jpaTask = em.find(JpaTask.class, taskId); + if (jpaTask != null) { + em.remove(jpaTask); + LOGGER.debug("Successfully deleted task with ID: {}", taskId); + } else { + LOGGER.debug("Task not found for deletion with ID: {}", taskId); + } + } catch (PersistenceException e) { + LOGGER.error("Database deletion failed for task with ID: {}", taskId, e); + throw new TaskPersistenceException(taskId, + "Database deletion failed for task", e); } } @@ -231,117 +254,126 @@ public ListTasksResult list(ListTasksParams params) { LOGGER.debug("Listing tasks with params: contextId={}, status={}, pageSize={}, pageToken={}", params.contextId(), params.status(), params.pageSize(), params.pageToken()); - // Parse pageToken once at the beginning - PageToken pageToken = PageToken.fromString(params.pageToken()); - Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null; - String tokenId = pageToken != null ? pageToken.id() : null; - - // Build dynamic JPQL query with WHERE clauses for filtering - StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1"); - StringBuilder countQueryBuilder = new StringBuilder("SELECT COUNT(t) FROM JpaTask t WHERE 1=1"); - - // Apply contextId filter using denormalized column - if (params.contextId() != null) { - queryBuilder.append(" AND t.contextId = :contextId"); - countQueryBuilder.append(" AND t.contextId = :contextId"); - } + try { + // Parse pageToken once at the beginning + PageToken pageToken = PageToken.fromString(params.pageToken()); + Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null; + String tokenId = pageToken != null ? pageToken.id() : null; + + // Build dynamic JPQL query with WHERE clauses for filtering + StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1"); + StringBuilder countQueryBuilder = new StringBuilder("SELECT COUNT(t) FROM JpaTask t WHERE 1=1"); + + // Apply contextId filter using denormalized column + if (params.contextId() != null) { + queryBuilder.append(" AND t.contextId = :contextId"); + countQueryBuilder.append(" AND t.contextId = :contextId"); + } - // Apply status filter using denormalized column - if (params.status() != null) { - queryBuilder.append(" AND t.state = :state"); - countQueryBuilder.append(" AND t.state = :state"); - } + // Apply status filter using denormalized column + if (params.status() != null) { + queryBuilder.append(" AND t.state = :state"); + countQueryBuilder.append(" AND t.state = :state"); + } - // Apply statusTimestampAfter filter using denormalized timestamp column - if (params.statusTimestampAfter() != null) { - queryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter"); - countQueryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter"); - } + // Apply statusTimestampAfter filter using denormalized timestamp column + if (params.statusTimestampAfter() != null) { + queryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter"); + countQueryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter"); + } - // Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC) - if (tokenTimestamp != null) { - // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId) - queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))"); - } + // Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC) + if (tokenTimestamp != null) { + // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId) + queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))"); + } - // Sort by status timestamp descending (most recent first), then by ID for stable ordering - queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC"); + // Sort by status timestamp descending (most recent first), then by ID for stable ordering + queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC"); - // Create and configure the main query - TypedQuery query = em.createQuery(queryBuilder.toString(), JpaTask.class); + // Create and configure the main query + TypedQuery query = em.createQuery(queryBuilder.toString(), JpaTask.class); - // Set filter parameters - if (params.contextId() != null) { - query.setParameter("contextId", params.contextId()); - } - if (params.status() != null) { - query.setParameter("state", params.status().name()); - } - if (params.statusTimestampAfter() != null) { - query.setParameter("statusTimestampAfter", params.statusTimestampAfter()); - } - if (tokenTimestamp != null) { - query.setParameter("tokenTimestamp", tokenTimestamp); - query.setParameter("tokenId", tokenId); - } + // Set filter parameters + if (params.contextId() != null) { + query.setParameter("contextId", params.contextId()); + } + if (params.status() != null) { + query.setParameter("state", params.status().name()); + } + if (params.statusTimestampAfter() != null) { + query.setParameter("statusTimestampAfter", params.statusTimestampAfter()); + } + if (tokenTimestamp != null) { + query.setParameter("tokenTimestamp", tokenTimestamp); + query.setParameter("tokenId", tokenId); + } - // Apply page size limit (+1 to check for next page) - int pageSize = params.getEffectivePageSize(); - query.setMaxResults(pageSize + 1); + // Apply page size limit (+1 to check for next page) + int pageSize = params.getEffectivePageSize(); + query.setMaxResults(pageSize + 1); - // Execute query and deserialize tasks - List jpaTasksPage = query.getResultList(); + // Execute query and deserialize tasks + List jpaTasksPage = query.getResultList(); - // Determine if there are more results - boolean hasMore = jpaTasksPage.size() > pageSize; - if (hasMore) { - jpaTasksPage = jpaTasksPage.subList(0, pageSize); - } + // Determine if there are more results + boolean hasMore = jpaTasksPage.size() > pageSize; + if (hasMore) { + jpaTasksPage = jpaTasksPage.subList(0, pageSize); + } - // Get total count of matching tasks - TypedQuery countQuery = em.createQuery(countQueryBuilder.toString(), Long.class); - if (params.contextId() != null) { - countQuery.setParameter("contextId", params.contextId()); - } - if (params.status() != null) { - countQuery.setParameter("state", params.status().name()); - } - if (params.statusTimestampAfter() != null) { - countQuery.setParameter("statusTimestampAfter", params.statusTimestampAfter()); - } - int totalSize = countQuery.getSingleResult().intValue(); + // Get total count of matching tasks + TypedQuery countQuery = em.createQuery(countQueryBuilder.toString(), Long.class); + if (params.contextId() != null) { + countQuery.setParameter("contextId", params.contextId()); + } + if (params.status() != null) { + countQuery.setParameter("state", params.status().name()); + } + if (params.statusTimestampAfter() != null) { + countQuery.setParameter("statusTimestampAfter", params.statusTimestampAfter()); + } + int totalSize = countQuery.getSingleResult().intValue(); + + // Deserialize tasks from JSON + List tasks = new ArrayList<>(); + for (JpaTask jpaTask : jpaTasksPage) { + try { + tasks.add(jpaTask.getTask()); + } catch (JsonProcessingException e) { + LOGGER.error("Failed to deserialize task with ID: {}", jpaTask.getId(), e); + throw new TaskSerializationException(jpaTask.getId(), + "Failed to deserialize task during list operation", e); + } + } - // Deserialize tasks from JSON - List tasks = new ArrayList<>(); - for (JpaTask jpaTask : jpaTasksPage) { - try { - tasks.add(jpaTask.getTask()); - } catch (JsonProcessingException e) { - LOGGER.error("Failed to deserialize task with ID: {}", jpaTask.getId(), e); - throw new RuntimeException("Failed to deserialize task with ID: " + jpaTask.getId(), e); + // Determine next page token (timestamp:ID of last task if there are more results) + // Format: "timestamp_millis:taskId" for keyset pagination + String nextPageToken = null; + if (hasMore && !tasks.isEmpty()) { + Task lastTask = tasks.get(tasks.size() - 1); + // All tasks have timestamps (TaskStatus canonical constructor ensures this) + Instant timestamp = lastTask.status().timestamp().toInstant(); + nextPageToken = new PageToken(timestamp, lastTask.id()).toString(); } - } - // Determine next page token (timestamp:ID of last task if there are more results) - // Format: "timestamp_millis:taskId" for keyset pagination - String nextPageToken = null; - if (hasMore && !tasks.isEmpty()) { - Task lastTask = tasks.get(tasks.size() - 1); - // All tasks have timestamps (TaskStatus canonical constructor ensures this) - Instant timestamp = lastTask.status().timestamp().toInstant(); - nextPageToken = new PageToken(timestamp, lastTask.id()).toString(); - } + // Apply post-processing transformations (history limiting, artifact removal) + int historyLength = params.getEffectiveHistoryLength(); + boolean includeArtifacts = params.shouldIncludeArtifacts(); - // Apply post-processing transformations (history limiting, artifact removal) - int historyLength = params.getEffectiveHistoryLength(); - boolean includeArtifacts = params.shouldIncludeArtifacts(); + List transformedTasks = tasks.stream() + .map(task -> transformTask(task, historyLength, includeArtifacts)) + .toList(); - List transformedTasks = tasks.stream() - .map(task -> transformTask(task, historyLength, includeArtifacts)) - .toList(); + LOGGER.debug("Returning {} tasks out of {} total", transformedTasks.size(), totalSize); + return new ListTasksResult(transformedTasks, totalSize, transformedTasks.size(), nextPageToken); - LOGGER.debug("Returning {} tasks out of {} total", transformedTasks.size(), totalSize); - return new ListTasksResult(transformedTasks, totalSize, transformedTasks.size(), nextPageToken); + } catch (PersistenceException e) { + // Database errors from query creation, execution, or count + LOGGER.error("Database query failed during list operation", e); + throw new TaskPersistenceException(null, // No single taskId for list operation + "Database query failed during list operation", e); + } } private Task transformTask(Task task, int historyLength, boolean includeArtifacts) { diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java index 575078679..d7eece30b 100644 --- a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java @@ -10,6 +10,8 @@ import io.a2a.server.tasks.PushNotificationSender; import io.a2a.server.tasks.TaskManager; +import io.a2a.server.tasks.TaskPersistenceException; +import io.a2a.server.tasks.TaskSerializationException; import io.a2a.server.tasks.TaskStore; import io.a2a.spec.A2AError; import io.a2a.spec.A2AServerException; @@ -41,6 +43,18 @@ * Note: This bean is eagerly initialized by {@link MainEventBusProcessorInitializer} * to ensure the background thread starts automatically when the application starts. *

+ * + *

Exception Handling

+ * TaskStore persistence failures are caught and handled gracefully: + *
    + *
  • {@link TaskSerializationException} - Data corruption or schema mismatch. + * Logged at ERROR level, distributed as {@link InternalError} to clients.
  • + *
  • {@link TaskPersistenceException} - Database/storage system failure. + * Logged at ERROR level, distributed as {@link InternalError} to clients.
  • + *
+ * + *

Processing continues after errors - the failed event is distributed as InternalError + * to all ChildQueues, and the MainEventBusProcessor continues consuming subsequent events.

*/ @ApplicationScoped public class MainEventBusProcessor implements Runnable { @@ -293,11 +307,26 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", taskId, event.getClass().getSimpleName(), isFinal, isReplicated); return isFinal; + + } catch (TaskSerializationException e) { + // Data corruption or schema mismatch - ALWAYS permanent + LOGGER.error("Task {} event serialization failed - data corruption detected: {}", + taskId, e.getMessage(), e); + throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage()); + + } catch (TaskPersistenceException e) { + // Database/storage failure + LOGGER.error("Task {} event persistence failed: {}", taskId, e.getMessage(), e); + throw new InternalError("Storage failure for task " + taskId + ": " + e.getMessage()); + } catch (InternalError e) { + // Already an InternalError from TaskManager validation - pass through LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e); // Rethrow to prevent distributing unpersisted event to clients throw e; + } catch (Exception e) { + // Unexpected exception type - treat as permanent failure LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e); // Rethrow to prevent distributing unpersisted event to clients throw new InternalError("TaskStore persistence failed: " + e.getMessage()); diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 15f94d7e8..7c443b1cc 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -25,6 +25,62 @@ *

* This is the default TaskStore used when no other implementation is provided. *

+ * + *

Exception Behavior

+ * InMemoryTaskStore has minimal exception scenarios compared to database-backed implementations: + *
    + *
  • No TaskSerializationException: Task objects are stored directly in memory without + * serialization. No JSON parsing or schema compatibility issues can occur.
  • + *
  • No TaskPersistenceException: ConcurrentHashMap operations do not involve I/O, + * network, or transactional concerns. Standard put/get/remove operations are guaranteed + * to succeed under normal JVM operation.
  • + *
  • OutOfMemoryError (potential): The only failure scenario is JVM heap exhaustion if + * too many tasks are stored. This is an {@link Error} (not Exception) and indicates a fatal + * system condition requiring JVM restart and capacity planning.
  • + *
+ * + *

Design Rationale

+ * This implementation intentionally does NOT throw {@link TaskStoreException} or its subclasses + * because: + *
    + *
  • No serialization step exists - tasks stored as Java objects
  • + *
  • No I/O or network operations that can fail
  • + *
  • ConcurrentHashMap guarantees thread-safe operations without checked exceptions
  • + *
  • Memory exhaustion (OutOfMemoryError) is an unrecoverable system failure
  • + *
+ * + *

Comparison to Database Implementations

+ * Database-backed implementations (e.g., JpaDatabaseTaskStore) throw exceptions for: + *
    + *
  • Serialization errors (JSON parsing, schema mismatches)
  • + *
  • Connection failures (network, timeouts)
  • + *
  • Transaction failures (deadlocks, constraint violations)
  • + *
  • Capacity issues (disk full, quota exceeded)
  • + *
+ * InMemoryTaskStore avoids all of these by operating entirely in-process. + * + *

Memory Management Considerations

+ * Callers should monitor memory usage and implement task cleanup policies: + *
{@code
+ * // Example: Delete finalized tasks older than 48 hours
+ * ListTasksParams params = new ListTasksParams.Builder()
+ *     .statusTimestampBefore(Instant.now().minus(Duration.ofHours(48)))
+ *     .build();
+ *
+ * List oldTasks = taskStore.list(params).tasks();
+ * oldTasks.stream()
+ *     .filter(task -> task.status().state().isFinal())
+ *     .forEach(task -> taskStore.delete(task.id()));
+ * }
+ * + *

Thread Safety

+ * All operations are thread-safe via {@link ConcurrentHashMap}. Multiple threads can + * concurrently save, get, list, and delete tasks without synchronization. Last-write-wins + * semantics apply for concurrent {@code save()} calls to the same task ID. + * + * @see TaskStore for interface contract and exception documentation + * @see TaskStoreException for exception hierarchy (not thrown by this implementation) + * @see TaskStateProvider for queue lifecycle integration */ @ApplicationScoped public class InMemoryTaskStore implements TaskStore, TaskStateProvider { diff --git a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java index f8b7b018d..093241182 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java +++ b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java @@ -63,7 +63,6 @@ * Implementations should handle errors gracefully: *
    *
  • Log failures but don't throw exceptions (notifications are best-effort)
  • - *
  • Consider retry logic for transient failures
  • *
  • Don't block on network I/O - execute asynchronously if needed
  • *
  • Circuit breaker patterns for repeatedly failing endpoints
  • *
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java new file mode 100644 index 000000000..97007cab9 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java @@ -0,0 +1,91 @@ +package io.a2a.server.tasks; + +import org.jspecify.annotations.Nullable; + +/** + * Exception for database/storage system failures during task persistence operations. + *

+ * Indicates failures in the underlying storage system (database, filesystem, etc.) rather + * than data format issues. + * + *

Common Scenarios

+ *
    + *
  • Database connection timeout or network partition
  • + *
  • Transaction deadlock or lock wait timeout
  • + *
  • Connection pool exhausted
  • + *
  • Disk full / storage quota exceeded
  • + *
  • Database constraint violations (unique key, foreign key)
  • + *
  • Insufficient permissions or authentication failures
  • + *
  • Database schema incompatibilities
  • + *
+ * + *

Usage Example

+ *
{@code
+ * try {
+ *     em.merge(jpaTask);
+ * } catch (PersistenceException e) {
+ *     throw new TaskPersistenceException(taskId, "Database save failed", e);
+ * }
+ * }
+ * + * @see TaskStoreException + * @see TaskSerializationException for data format errors + */ +public class TaskPersistenceException extends TaskStoreException { + + /** + * Creates a new TaskPersistenceException with no message or cause. + */ + public TaskPersistenceException() { + super(); + } + + /** + * Creates a new TaskPersistenceException with the specified message. + * + * @param msg the exception message + */ + public TaskPersistenceException(final String msg) { + super(msg); + } + + /** + * Creates a new TaskPersistenceException with the specified cause. + * + * @param cause the underlying cause + */ + public TaskPersistenceException(final Throwable cause) { + super(cause); + } + + /** + * Creates a new TaskPersistenceException with the specified message and cause. + * + * @param msg the exception message + * @param cause the underlying cause + */ + public TaskPersistenceException(final String msg, final Throwable cause) { + super(msg, cause); + } + + /** + * Creates a new TaskPersistenceException with the specified task ID and message. + * + * @param taskId the task identifier (may be null for operations not tied to a specific task) + * @param msg the exception message + */ + public TaskPersistenceException(@Nullable final String taskId, final String msg) { + super(taskId, msg); + } + + /** + * Creates a new TaskPersistenceException with the specified task ID, message, and cause. + * + * @param taskId the task identifier (may be null for operations not tied to a specific task) + * @param msg the exception message + * @param cause the underlying cause + */ + public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause) { + super(taskId, msg, cause); + } +} diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java new file mode 100644 index 000000000..5916ce530 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java @@ -0,0 +1,87 @@ +package io.a2a.server.tasks; + +import org.jspecify.annotations.Nullable; + +/** + * Exception for task serialization/deserialization failures. + *

+ * Indicates failures converting between Task domain objects and persistent storage format (JSON). + * + *

Common Scenarios

+ *
    + *
  • JSON parsing errors during {@code get()} operations
  • + *
  • JSON serialization errors during {@code save()} operations
  • + *
  • Invalid enum values or missing required fields
  • + *
  • Data format version mismatches after upgrades
  • + *
+ * + *

Usage Example

+ *
{@code
+ * try {
+ *     Task task = jsonMapper.readValue(json, Task.class);
+ * } catch (JsonProcessingException e) {
+ *     throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
+ * }
+ * }
+ * + * @see TaskStoreException + * @see TaskPersistenceException for database failures + */ +public class TaskSerializationException extends TaskStoreException { + + /** + * Creates a new TaskSerializationException with no message or cause. + */ + public TaskSerializationException() { + super(); + } + + /** + * Creates a new TaskSerializationException with the specified message. + * + * @param msg the exception message + */ + public TaskSerializationException(final String msg) { + super(msg); + } + + /** + * Creates a new TaskSerializationException with the specified cause. + * + * @param cause the underlying cause + */ + public TaskSerializationException(final Throwable cause) { + super(cause); + } + + /** + * Creates a new TaskSerializationException with the specified message and cause. + * + * @param msg the exception message + * @param cause the underlying cause + */ + public TaskSerializationException(final String msg, final Throwable cause) { + super(msg, cause); + } + + /** + * Creates a new TaskSerializationException with the specified task ID and message. + * + * @param taskId the task identifier (may be null for operations not tied to a specific task) + * @param msg the exception message + */ + public TaskSerializationException(@Nullable final String taskId, final String msg) { + super(taskId, msg); + } + + /** + * Creates a new TaskSerializationException with the specified task ID, message, and cause. + * + * @param taskId the task identifier (may be null for operations not tied to a specific task) + * @param msg the exception message + * @param cause the underlying cause + */ + public TaskSerializationException(@Nullable final String taskId, final String msg, final Throwable cause) { + super(taskId, msg, cause); + } +} diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java index 3df903f77..7ae7a46bf 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java @@ -90,10 +90,64 @@ *
  • Consider caching for frequently-accessed task lists
  • * * + *

    Exception Contract

    + * All TaskStore methods may throw {@link TaskStoreException} or its subclasses to indicate + * persistence failures: + *
      + *
    • {@link TaskSerializationException} - JSON/data format errors
    • + *
    • {@link TaskPersistenceException} - Database/storage system failures
    • + *
    + * + *

    When to Throw TaskSerializationException

    + * Use when task data cannot be serialized or deserialized: + *
      + *
    • JSON parsing errors during {@code get()} operations
    • + *
    • JSON serialization errors during {@code save()} operations
    • + *
    • Invalid enum values or missing required fields
    • + *
    • Schema version mismatches after upgrades
    • + *
    + * + *

    When to Throw TaskPersistenceException

    + * Use when the storage system fails: + *
      + *
    • Database connection timeouts
    • + *
    • Transaction deadlocks
    • + *
    • Connection pool exhausted
    • + *
    • Disk full / quota exceeded
    • + *
    • Database constraint violations
    • + *
    • Insufficient permissions
    • + *
    + * + *

    Implementer Example

    + *
    {@code
    + * @Override
    + * public void save(Task task, boolean isReplicated) {
    + *     try {
    + *         String json = objectMapper.writeValueAsString(task);
    + *     } catch (JsonProcessingException e) {
    + *         throw new TaskSerializationException(task.id(), "Failed to serialize task", e);
    + *     }
    + *
    + *     try {
    + *         entityManager.merge(toEntity(json));
    + *     } catch (PersistenceException e) {
    + *         throw new TaskPersistenceException(task.id(), "Database save failed", e);
    + *     }
    + * }
    + * }
    + * + *

    Exception Handling

    + * {@link io.a2a.server.events.MainEventBusProcessor} catches TaskStore exceptions and + * wraps them in {@link io.a2a.spec.InternalError} events for client distribution. + * * @see TaskManager * @see TaskStateProvider + * @see TaskStoreException + * @see TaskSerializationException + * @see TaskPersistenceException * @see InMemoryTaskStore * @see io.a2a.server.requesthandlers.DefaultRequestHandler + * @see io.a2a.server.events.MainEventBusProcessor */ public interface TaskStore { /** @@ -103,6 +157,10 @@ public interface TaskStore { * @param isReplicated true if this task update came from a replicated event, * false if it originated locally. Used to prevent feedback loops * in replicated scenarios (e.g., don't fire TaskFinalizedEvent for replicated updates) + * @throws TaskSerializationException if the task cannot be serialized to storage format (JSON parsing error, + * invalid field values, schema mismatch) + * @throws TaskPersistenceException if the storage system fails (database timeout, connection error, disk full) + * @throws TaskStoreException for other persistence failures not covered by specific subclasses */ void save(Task task, boolean isReplicated); @@ -111,6 +169,11 @@ public interface TaskStore { * * @param taskId the task identifier * @return the task if found, null otherwise + * @throws TaskSerializationException if the persisted task data cannot be deserialized (corrupted JSON, + * schema incompatibility) + * @throws TaskPersistenceException if the storage system fails during retrieval (database connection error, + * query timeout) + * @throws TaskStoreException for other retrieval failures not covered by specific subclasses */ @Nullable Task get(String taskId); @@ -118,6 +181,9 @@ public interface TaskStore { * Deletes a task by its ID. * * @param taskId the task identifier + * @throws TaskPersistenceException if the storage system fails during deletion (database connection error, + * transaction timeout, constraint violation) + * @throws TaskStoreException for other deletion failures not covered by specific subclasses */ void delete(String taskId); @@ -126,6 +192,11 @@ public interface TaskStore { * * @param params the filtering and pagination parameters * @return the list of tasks matching the criteria with pagination info + * @throws TaskSerializationException if any persisted task data cannot be deserialized during listing + * (corrupted JSON in database) + * @throws TaskPersistenceException if the storage system fails during the list operation (database query timeout, + * connection error) + * @throws TaskStoreException for other listing failures not covered by specific subclasses */ ListTasksResult list(ListTasksParams params); } diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStoreException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStoreException.java new file mode 100644 index 000000000..5865c5a80 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStoreException.java @@ -0,0 +1,113 @@ +package io.a2a.server.tasks; + +import io.a2a.spec.A2AServerException; +import org.jspecify.annotations.Nullable; + +/** + * Base exception for TaskStore persistence layer failures. + *

    + * Root exception for all task storage and retrieval errors. Specialized subclasses + * provide specific failure contexts: + *

      + *
    • {@link TaskSerializationException} - JSON/data format errors
    • + *
    • {@link TaskPersistenceException} - Database/storage system failures
    • + *
    + * + *

    Usage Context

    + * Thrown by {@link TaskStore} implementations during: + *
      + *
    • {@code save(Task, boolean)} - Task persistence failures
    • + *
    • {@code get(String)} - Task retrieval failures
    • + *
    • {@code delete(String)} - Task deletion failures
    • + *
    • {@code list(ListTasksParams)} - Task listing failures
    • + *
    + * + *

    Error Handling Pattern

    + * Caught by {@link io.a2a.server.events.MainEventBusProcessor} which: + *
      + *
    1. Logs the failure with full context (taskId, operation)
    2. + *
    3. Distributes {@link io.a2a.spec.InternalError} event to clients
    4. + *
    5. Preserves exception cause chain for diagnostics
    6. + *
    + * + * @see TaskSerializationException for data format errors + * @see TaskPersistenceException for storage system failures + * @see TaskStore + */ +public class TaskStoreException extends A2AServerException { + + @Nullable + private final String taskId; + + /** + * Creates a new TaskStoreException with no message or cause. + */ + public TaskStoreException() { + super(); + this.taskId = null; + } + + /** + * Creates a new TaskStoreException with the specified message. + * + * @param msg the exception message + */ + public TaskStoreException(final String msg) { + super(msg); + this.taskId = null; + } + + /** + * Creates a new TaskStoreException with the specified cause. + * + * @param cause the underlying cause + */ + public TaskStoreException(final Throwable cause) { + super(cause); + this.taskId = null; + } + + /** + * Creates a new TaskStoreException with the specified message and cause. + * + * @param msg the exception message + * @param cause the underlying cause + */ + public TaskStoreException(final String msg, final Throwable cause) { + super(msg, cause); + this.taskId = null; + } + + /** + * Creates a new TaskStoreException with the specified task ID and message. + * + * @param taskId the task identifier (may be null for operations not tied to a specific task) + * @param msg the exception message + */ + public TaskStoreException(@Nullable final String taskId, final String msg) { + super(msg); + this.taskId = taskId; + } + + /** + * Creates a new TaskStoreException with the specified task ID, message, and cause. + * + * @param taskId the task identifier (may be null for operations not tied to a specific task) + * @param msg the exception message + * @param cause the underlying cause + */ + public TaskStoreException(@Nullable final String taskId, final String msg, final Throwable cause) { + super(msg, cause); + this.taskId = taskId; + } + + /** + * Returns the task ID associated with this exception. + * + * @return the task ID, or null if not associated with a specific task + */ + @Nullable + public String getTaskId() { + return taskId; + } +} diff --git a/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java b/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java new file mode 100644 index 000000000..8bb212601 --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java @@ -0,0 +1,270 @@ +package io.a2a.server.events; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.a2a.server.tasks.PushNotificationSender; +import io.a2a.server.tasks.TaskManager; +import io.a2a.server.tasks.TaskPersistenceException; +import io.a2a.server.tasks.TaskSerializationException; +import io.a2a.server.tasks.TaskStore; +import io.a2a.spec.Event; +import io.a2a.spec.InternalError; +import io.a2a.spec.Task; +import io.a2a.spec.TaskState; +import io.a2a.spec.TaskStatus; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; + +/** + * Integration tests for MainEventBusProcessor exception handling. + *

    + * Tests verify that TaskStore persistence failures are converted to InternalError events + * and distributed to clients with appropriate logging based on failure type: + *

      + *
    • TaskSerializationException → ERROR log + InternalError
    • + *
    • TaskPersistenceException → ERROR log + InternalError
    • + *
    + */ +public class MainEventBusProcessorExceptionTest { + + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final String TASK_ID = "test-task-123"; + + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; + private TaskStore mockTaskStore; + private InMemoryQueueManager queueManager; + private EventQueue eventQueue; + private ListAppender logAppender; + + @BeforeEach + public void setUp() { + // Set up mock TaskStore + mockTaskStore = mock(TaskStore.class); + + // Set up MainEventBus and processor with mock TaskStore + mainEventBus = new MainEventBus(); + queueManager = new InMemoryQueueManager(null, mainEventBus); // null TaskStateProvider for tests + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, mockTaskStore, + NOOP_PUSHNOTIFICATION_SENDER, queueManager); + + // Set up log capture for verifying error messages + Logger logger = (Logger) LoggerFactory.getLogger(MainEventBusProcessor.class); + logAppender = new ListAppender<>(); + logAppender.start(); + logger.addAppender(logAppender); + + // Start processor + EventQueueUtil.start(mainEventBusProcessor); + + // Create event queue for testing + eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(TASK_ID) + .mainEventBus(mainEventBus) + .build().tap(); + } + + @AfterEach + public void tearDown() { + if (mainEventBusProcessor != null) { + mainEventBusProcessor.setCallback(null); + EventQueueUtil.stop(mainEventBusProcessor); + } + + // Clean up log appender + Logger logger = (Logger) LoggerFactory.getLogger(MainEventBusProcessor.class); + logger.detachAppender(logAppender); + } + + /** + * Test that TaskSerializationException is converted to InternalError with ERROR log. + * AC#1: Mock TaskStore throws TaskSerializationException → MainEventBusProcessor distributes InternalError + */ + @Test + public void testTaskSerializationException_ConvertsToInternalError() throws InterruptedException { + // Arrange: Mock TaskStore to throw TaskSerializationException + String exceptionMessage = "Failed to deserialize corrupted JSON"; + TaskSerializationException exception = new TaskSerializationException(TASK_ID, exceptionMessage); + when(mockTaskStore.get(any())).thenThrow(exception); + doThrow(exception).when(mockTaskStore).save(any(Task.class), anyBoolean()); + + Task testTask = createTestTask(); + + // Act: Enqueue event and wait for processing + List distributedEvents = captureDistributedEvent(testTask); + + // Assert: Verify InternalError was distributed + assertEquals(1, distributedEvents.size(), "Should distribute exactly one event"); + Event distributedEvent = distributedEvents.get(0); + assertInstanceOf(InternalError.class, distributedEvent, + "TaskSerializationException should convert to InternalError"); + + InternalError error = (InternalError) distributedEvent; + assertTrue(error.getMessage().contains(TASK_ID), + "Error message should contain task ID: " + error.getMessage()); + assertTrue(error.getMessage().contains("serialize"), + "Error message should mention serialization: " + error.getMessage()); + + // Assert: Verify ERROR level logging + boolean foundErrorLog = logAppender.list.stream() + .anyMatch(event -> event.getLevel() == Level.ERROR + && event.getFormattedMessage().contains(TASK_ID) + && event.getFormattedMessage().contains("serialization")); + assertTrue(foundErrorLog, "Should log TaskSerializationException at ERROR level"); + } + + /** + * Test that TaskPersistenceException is converted to InternalError with ERROR log. + * AC#2: Mock TaskStore throws TaskPersistenceException → ERROR log + InternalError + */ + @Test + public void testTaskPersistenceException_ConvertsToInternalError() throws InterruptedException { + // Arrange: Mock TaskStore to throw TaskPersistenceException + String exceptionMessage = "Database operation failed"; + TaskPersistenceException exception = new TaskPersistenceException( + TASK_ID, exceptionMessage + ); + when(mockTaskStore.get(any())).thenThrow(exception); + doThrow(exception).when(mockTaskStore).save(any(Task.class), anyBoolean()); + + Task testTask = createTestTask(); + + // Act: Enqueue event and wait for processing + List distributedEvents = captureDistributedEvent(testTask); + + // Assert: Verify InternalError was distributed + assertEquals(1, distributedEvents.size(), "Should distribute exactly one event"); + Event distributedEvent = distributedEvents.get(0); + assertInstanceOf(InternalError.class, distributedEvent, + "TaskPersistenceException should convert to InternalError"); + + InternalError error = (InternalError) distributedEvent; + assertTrue(error.getMessage().contains(TASK_ID), + "Error message should contain task ID: " + error.getMessage()); + + // Assert: Verify ERROR level logging + boolean foundErrorLog = logAppender.list.stream() + .anyMatch(event -> event.getLevel() == Level.ERROR + && event.getFormattedMessage().contains(TASK_ID) + && event.getFormattedMessage().contains("persistence failed")); + assertTrue(foundErrorLog, "Should log TaskPersistenceException at ERROR level"); + } + + /** + * Test that taskId is preserved through exception chain and appears in error messages. + * AC#5: All tests validate error messages contain taskId and failure type + */ + @Test + public void testTaskIdPreservedInExceptionChain() throws InterruptedException { + // Arrange: Create exception with specific taskId + String specificTaskId = "task-with-unique-id-12345"; + TaskSerializationException exception = new TaskSerializationException( + specificTaskId, "Test exception with specific task ID" + ); + when(mockTaskStore.get(any())).thenThrow(exception); + doThrow(exception).when(mockTaskStore).save(any(Task.class), anyBoolean()); + + // Create event queue with specific taskId + EventQueue specificQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(specificTaskId) + .mainEventBus(mainEventBus) + .build().tap(); + + Task testTask = Task.builder() + .id(specificTaskId) + .contextId("test-context") + .status(new TaskStatus(TaskState.TASK_STATE_SUBMITTED)) + .build(); + + // Act: Enqueue event and wait for processing + List distributedEvents = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, Event event) { + distributedEvents.add(event); + latch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // No-op for this test + } + }); + + specificQueue.enqueueEvent(testTask); + assertTrue(latch.await(5, TimeUnit.SECONDS), "Event processing should complete within timeout"); + + // Assert: Verify specific taskId appears in distributed error + assertEquals(1, distributedEvents.size()); + InternalError error = (InternalError) distributedEvents.get(0); + assertTrue(error.getMessage().contains(specificTaskId), + "Error should contain specific task ID: " + error.getMessage()); + + // Assert: Verify specific taskId appears in logs + boolean foundTaskIdInLog = logAppender.list.stream() + .anyMatch(event -> event.getFormattedMessage().contains(specificTaskId)); + assertTrue(foundTaskIdInLog, "Logs should contain specific task ID"); + } + + /** + * Helper method to create a test Task. + */ + private Task createTestTask() { + return Task.builder() + .id(TASK_ID) + .contextId("test-context") + .status(new TaskStatus(TaskState.TASK_STATE_SUBMITTED)) + .build(); + } + + /** + * Helper method to enqueue an event and capture what gets distributed to clients. + * Uses MainEventBusProcessorCallback to wait for async processing. + * + * @param event the event to enqueue + * @return list of events distributed to ChildQueues (should be 1 event) + */ + private List captureDistributedEvent(Event event) throws InterruptedException { + List distributedEvents = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + + mainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, Event processedEvent) { + distributedEvents.add(processedEvent); + latch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // No-op for exception tests + } + }); + + eventQueue.enqueueEvent(event); + + assertTrue(latch.await(5, TimeUnit.SECONDS), + "Event processing should complete within timeout"); + + return distributedEvents; + } +} diff --git a/server-common/src/test/java/io/a2a/server/tasks/AbstractTaskStoreExceptionTest.java b/server-common/src/test/java/io/a2a/server/tasks/AbstractTaskStoreExceptionTest.java new file mode 100644 index 000000000..9646c24bf --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/tasks/AbstractTaskStoreExceptionTest.java @@ -0,0 +1,168 @@ +package io.a2a.server.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import org.junit.jupiter.api.Test; + +/** + * Base test class for TaskStore exception validation. + *

    + * Provides reusable test patterns for exception construction, field verification, + * and message formatting. Subclasses must implement {@link #createException} methods + * to test specific exception types. + *

    + * This class is designed to be extended by implementation tests (e.g., InMemoryTaskStore tests) + * to ensure consistent exception behavior across all TaskStore implementations. + * + * @param the exception type being tested (must extend TaskStoreException) + */ +public abstract class AbstractTaskStoreExceptionTest { + + /** + * Creates an exception with a message and taskId. + * Used for testing basic exception construction with task context. + * + * @param taskId the task identifier + * @param message the exception message + * @return the constructed exception + */ + protected abstract T createException(String taskId, String message); + + /** + * Creates an exception with a taskId, message, and cause. + * Used for testing exception chaining with task context. + * + * @param taskId the task identifier + * @param message the exception message + * @param cause the underlying cause + * @return the constructed exception + */ + protected abstract T createException(String taskId, String message, Throwable cause); + + // ========== Task ID Field Tests ========== + + @Test + void testTaskIdField_withTaskId() { + T exception = createException("task-123", "Test message"); + assertEquals("task-123", exception.getTaskId()); + } + + @Test + void testTaskIdField_nullTaskId() { + T exception = createException(null, "Test message"); + assertNull(exception.getTaskId()); + } + + @Test + void testTaskIdField_emptyTaskId() { + T exception = createException("", "Test message"); + assertEquals("", exception.getTaskId()); + } + + // ========== Message Field Tests ========== + + @Test + void testMessageField_nonNull() { + T exception = createException("task-123", "Failed to save task"); + assertNotNull(exception.getMessage()); + assertEquals("Failed to save task", exception.getMessage()); + } + + @Test + void testMessageField_withContext() { + T exception = createException("task-123", "Database connection timeout"); + assertNotNull(exception.getMessage()); + assertEquals("Database connection timeout", exception.getMessage()); + } + + // ========== Cause Chain Tests ========== + + @Test + void testCauseChain_withCause() { + RuntimeException cause = new RuntimeException("Root cause"); + T exception = createException("task-123", "Wrapper message", cause); + + assertNotNull(exception.getCause()); + assertSame(cause, exception.getCause()); + assertEquals("Root cause", exception.getCause().getMessage()); + } + + @Test + void testCauseChain_multipleLevels() { + RuntimeException rootCause = new RuntimeException("Database error"); + IllegalStateException intermediateCause = new IllegalStateException("Transaction failed", rootCause); + T exception = createException("task-123", "Save failed", intermediateCause); + + assertNotNull(exception.getCause()); + assertSame(intermediateCause, exception.getCause()); + assertNotNull(exception.getCause().getCause()); + assertSame(rootCause, exception.getCause().getCause()); + } + + // ========== Exception Inheritance Tests ========== + + @Test + void testInheritance_isTaskStoreException() { + T exception = createException("task-123", "Test message"); + assertNotNull(exception); + // Verified by generic type constraint: T extends TaskStoreException + } + + @Test + void testInheritance_isThrowable() { + T exception = createException("task-123", "Test message"); + Throwable throwable = exception; + assertNotNull(throwable); + } + + // ========== Message Clarity Tests ========== + + /** + * Verifies that exception messages are clear and actionable. + * Subclasses should override this to test domain-specific message patterns. + */ + @Test + void testMessageClarity_basicPattern() { + T exception = createException("task-123", "Operation failed"); + String message = exception.getMessage(); + + assertNotNull(message); + // Message should not be empty + assert !message.trim().isEmpty() : "Exception message should not be empty"; + // Message should not be too short (less than 5 characters is typically not helpful) + assert message.length() >= 5 : "Exception message should be descriptive"; + } + + // ========== Helper Assertions for Subclasses ========== + + /** + * Asserts that an exception message contains expected context information. + * Useful for implementation tests to verify TaskStore-specific message patterns. + * + * @param exception the exception to check + * @param expectedSubstring the expected substring in the message + */ + protected void assertMessageContains(T exception, String expectedSubstring) { + assertNotNull(exception.getMessage()); + assert exception.getMessage().contains(expectedSubstring) + : String.format("Expected message to contain '%s' but was: %s", + expectedSubstring, exception.getMessage()); + } + + /** + * Asserts that an exception has both taskId and cause properly set. + * Useful for implementation tests to verify complete exception context. + * + * @param exception the exception to check + * @param expectedTaskId the expected task ID + * @param expectedCause the expected cause + */ + protected void assertFullContext(T exception, String expectedTaskId, Throwable expectedCause) { + assertEquals(expectedTaskId, exception.getTaskId()); + assertSame(expectedCause, exception.getCause()); + assertNotNull(exception.getMessage()); + } +} diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskPersistenceExceptionTest.java b/server-common/src/test/java/io/a2a/server/tasks/TaskPersistenceExceptionTest.java new file mode 100644 index 000000000..51ad86dc3 --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/tasks/TaskPersistenceExceptionTest.java @@ -0,0 +1,258 @@ +package io.a2a.server.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link TaskPersistenceException}. + *

    + * Tests the exception class for database/storage system failures. + */ +class TaskPersistenceExceptionTest extends AbstractTaskStoreExceptionTest { + + @Override + protected TaskPersistenceException createException(String taskId, String message) { + return new TaskPersistenceException(taskId, message); + } + + @Override + protected TaskPersistenceException createException(String taskId, String message, Throwable cause) { + return new TaskPersistenceException(taskId, message, cause); + } + + // ========== Constructor Tests ========== + + @Test + void testConstructor_noArgs() { + TaskPersistenceException exception = new TaskPersistenceException(); + assertNull(exception.getMessage()); + assertNull(exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_messageOnly() { + TaskPersistenceException exception = new TaskPersistenceException("Database error"); + assertEquals("Database error", exception.getMessage()); + assertNull(exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_causeOnly() { + RuntimeException cause = new RuntimeException("Connection failed"); + TaskPersistenceException exception = new TaskPersistenceException(cause); + + assertNotNull(exception.getMessage()); + assertSame(cause, exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_messageAndCause() { + RuntimeException cause = new RuntimeException("Timeout"); + TaskPersistenceException exception = new TaskPersistenceException("Operation failed", cause); + + assertEquals("Operation failed", exception.getMessage()); + assertSame(cause, exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_taskIdAndMessage() { + TaskPersistenceException exception = new TaskPersistenceException("task-123", "Save failed"); + + assertEquals("Save failed", exception.getMessage()); + assertEquals("task-123", exception.getTaskId()); + assertNull(exception.getCause()); + } + + @Test + void testConstructor_taskIdMessageAndCause() { + RuntimeException cause = new RuntimeException("Disk error"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-456", "Persistence failed", cause); + + assertEquals("Persistence failed", exception.getMessage()); + assertEquals("task-456", exception.getTaskId()); + assertSame(cause, exception.getCause()); + } + + // ========== Inheritance Verification ========== + + @Test + void testInheritance_extendsTaskStoreException() { + TaskPersistenceException exception = new TaskPersistenceException("test", "Test message"); + TaskStoreException baseException = exception; + assertNotNull(baseException); + } + + @Test + void testInheritance_taskIdFieldAccessible() { + TaskPersistenceException exception = new TaskPersistenceException("task-xyz", "Test message"); + // Should be accessible via TaskStoreException parent class + assertEquals("task-xyz", exception.getTaskId()); + } + + // ========== Real-World Scenario Tests ========== + + @Test + void testScenario_connectionTimeout() { + RuntimeException timeout = new RuntimeException("Connection timeout after 30s"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-123", "Database connection timeout", timeout); + + assertFullContext(exception, "task-123", timeout); + assertMessageContains(exception, "timeout"); + } + + @Test + void testScenario_deadlock() { + RuntimeException deadlock = new RuntimeException("Deadlock detected"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-456", "Transaction deadlock", deadlock); + + assertMessageContains(exception, "deadlock"); + } + + @Test + void testScenario_lockTimeout() { + RuntimeException lockTimeout = new RuntimeException("Lock wait timeout exceeded"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-789", "Failed to acquire row lock", lockTimeout); + + assertMessageContains(exception, "lock"); + } + + @Test + void testScenario_networkPartition() { + RuntimeException networkError = new RuntimeException("Network unreachable"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-abc", "Database host unreachable due to network partition", networkError); + + assertMessageContains(exception, "network"); + } + + @Test + void testScenario_poolExhausted() { + RuntimeException poolError = new RuntimeException("Connection pool exhausted"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-def", "No database connections available", poolError); + + assertMessageContains(exception, "connections available"); + } + + @Test + void testScenario_diskFull() { + RuntimeException diskError = new RuntimeException("No space left on device"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-123", "Cannot write task: disk full", diskError); + + assertMessageContains(exception, "disk full"); + } + + @Test + void testScenario_uniqueConstraint() { + RuntimeException constraintError = new RuntimeException("Duplicate entry for key 'PRIMARY'"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-456", "Task ID already exists", constraintError); + + assertMessageContains(exception, "already exists"); + } + + @Test + void testScenario_foreignKeyViolation() { + RuntimeException fkError = new RuntimeException("Cannot add or update child row"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-789", "Foreign key constraint violation", fkError); + + assertMessageContains(exception, "constraint"); + } + + @Test + void testScenario_permissionDenied() { + RuntimeException permError = new RuntimeException("Access denied for user 'app'"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-abc", "Insufficient database permissions", permError); + + assertMessageContains(exception, "permission"); + } + + @Test + void testScenario_schemaIncompatibility() { + RuntimeException schemaError = new RuntimeException("Column 'new_field' does not exist"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-def", "Database schema incompatible with application version", schemaError); + + assertMessageContains(exception, "schema"); + } + + @Test + void testScenario_quotaExceeded() { + RuntimeException quotaError = new RuntimeException("Storage quota exceeded"); + TaskPersistenceException exception = new TaskPersistenceException( + "task-ghi", "Database storage limit reached", quotaError); + + assertMessageContains(exception, "storage limit"); + } + + // ========== Message Quality Tests ========== + + @Test + void testMessage_connectionError() { + TaskPersistenceException exception = new TaskPersistenceException( + "task-123", + "Failed to connect to database at jdbc:postgresql://localhost:5432/a2a"); + + assertMessageContains(exception, "connect to database"); + assertMessageContains(exception, "jdbc:postgresql"); + } + + @Test + void testMessage_transactionRollback() { + TaskPersistenceException exception = new TaskPersistenceException( + "task-123", + "Transaction rolled back due to deadlock"); + + assertMessageContains(exception, "Transaction rolled back"); + } + + @Test + void testMessage_constraintViolation() { + TaskPersistenceException exception = new TaskPersistenceException( + "task-123", + "Unique constraint violation: task_id 'task-123' already exists in table 'tasks'"); + + assertMessageContains(exception, "Unique constraint"); + assertMessageContains(exception, "already exists"); + } + + // ========== Cause Chain Tests ========== + + @Test + void testCauseChain_withContext() { + RuntimeException sqlError = new RuntimeException("SQL state: 08006 - Connection failure"); + RuntimeException jdbcError = new RuntimeException("JDBC connection error", sqlError); + TaskPersistenceException exception = new TaskPersistenceException( + "task-xyz", "Database operation failed", jdbcError); + + assertEquals("task-xyz", exception.getTaskId()); + assertSame(jdbcError, exception.getCause()); + assertSame(sqlError, exception.getCause().getCause()); + } + + // ========== Null Safety ========== + + @Test + void testNullSafety_nullTaskId() { + TaskPersistenceException exception = new TaskPersistenceException( + null, "Generic database error"); + + assertNull(exception.getTaskId()); + assertEquals("Generic database error", exception.getMessage()); + } +} diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java b/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java new file mode 100644 index 000000000..df356ef14 --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java @@ -0,0 +1,272 @@ +package io.a2a.server.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link TaskSerializationException}. + *

    + * Tests the exception class for task serialization/deserialization failures, + * verifying all constructor variants. + */ +class TaskSerializationExceptionTest extends AbstractTaskStoreExceptionTest { + + @Override + protected TaskSerializationException createException(String taskId, String message) { + return new TaskSerializationException(taskId, message); + } + + @Override + protected TaskSerializationException createException(String taskId, String message, Throwable cause) { + return new TaskSerializationException(taskId, message, cause); + } + + // ========== Constructor Tests ========== + + @Test + void testConstructor_noArgs() { + TaskSerializationException exception = new TaskSerializationException(); + assertNull(exception.getMessage()); + assertNull(exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_messageOnly() { + TaskSerializationException exception = new TaskSerializationException("JSON parsing failed"); + assertEquals("JSON parsing failed", exception.getMessage()); + assertNull(exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_causeOnly() { + RuntimeException cause = new RuntimeException("Invalid JSON format"); + TaskSerializationException exception = new TaskSerializationException(cause); + + assertNotNull(exception.getMessage()); + // Exception message should contain cause class name + assert exception.getMessage().contains("RuntimeException"); + assertSame(cause, exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_messageAndCause() { + RuntimeException cause = new RuntimeException("Unexpected field type"); + TaskSerializationException exception = new TaskSerializationException("Deserialization failed", cause); + + assertEquals("Deserialization failed", exception.getMessage()); + assertSame(cause, exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_taskIdAndMessage() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", "Failed to serialize task"); + + assertEquals("Failed to serialize task", exception.getMessage()); + assertEquals("task-123", exception.getTaskId()); + assertNull(exception.getCause()); + } + + @Test + void testConstructor_taskIdMessageAndCause() { + RuntimeException cause = new RuntimeException("Missing required field"); + TaskSerializationException exception = new TaskSerializationException( + "task-456", "Task deserialization failed", cause); + + assertEquals("Task deserialization failed", exception.getMessage()); + assertEquals("task-456", exception.getTaskId()); + assertSame(cause, exception.getCause()); + } + + // ========== Inheritance Verification ========== + + @Test + void testInheritance_extendsTaskStoreException() { + TaskSerializationException exception = new TaskSerializationException("test", "Test message"); + TaskStoreException baseException = exception; + assertNotNull(baseException); + } + + @Test + void testInheritance_taskIdFieldAccessible() { + TaskSerializationException exception = new TaskSerializationException("task-789", "Test message"); + // Should be accessible via TaskStoreException parent class + assertEquals("task-789", exception.getTaskId()); + } + + // ========== Message Quality Tests - Serialization Context ========== + + @Test + void testMessage_jsonParsingError() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", + "Failed to deserialize task: unexpected token at line 42, column 15"); + + assertNotNull(exception.getMessage()); + assertMessageContains(exception, "deserialize"); + assertMessageContains(exception, "line 42"); + } + + @Test + void testMessage_invalidFieldType() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", + "Field 'status' expected enum TaskState, got string 'INVALID'"); + + assertNotNull(exception.getMessage()); + assertMessageContains(exception, "Field 'status'"); + assertMessageContains(exception, "expected enum"); + } + + @Test + void testMessage_missingRequiredField() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", + "Missing required field 'id' during task deserialization"); + + assertNotNull(exception.getMessage()); + assertMessageContains(exception, "Missing required field"); + assertMessageContains(exception, "'id'"); + } + + @Test + void testMessage_schemaVersionMismatch() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", + "Task schema version 2.0 not compatible with current version 1.0"); + + assertNotNull(exception.getMessage()); + assertMessageContains(exception, "schema version"); + assertMessageContains(exception, "not compatible"); + } + + // ========== Real-World Scenario Tests ========== + + @Test + void testScenario_jsonProcessingException() { + // Simulate Jackson JsonProcessingException wrapping + RuntimeException jsonError = new RuntimeException( + "Unrecognized field \"unknownField\" (class Task), not marked as ignorable"); + TaskSerializationException exception = new TaskSerializationException( + "task-abc", "Failed to deserialize task from JSON", jsonError); + + assertFullContext(exception, "task-abc", jsonError); + assertMessageContains(exception, "Failed to deserialize"); + } + + @Test + void testScenario_enumConversionError() { + RuntimeException enumError = new RuntimeException( + "Cannot deserialize value of type `TaskState` from String \"INVALID_STATE\""); + TaskSerializationException exception = new TaskSerializationException( + "task-def", "Invalid enum value in task JSON", enumError); + + assertFullContext(exception, "task-def", enumError); + assertMessageContains(exception, "Invalid enum value"); + } + + @Test + void testScenario_nullValueError() { + TaskSerializationException exception = new TaskSerializationException( + "task-ghi", + "Required field 'taskId' cannot be null during deserialization"); + + assertEquals("task-ghi", exception.getTaskId()); + assertMessageContains(exception, "cannot be null"); + } + + // ========== Cause Chain for Debugging ========== + + @Test + void testCauseChain_multiLevelSerializationError() { + // Simulate nested serialization error with multiple layers + RuntimeException rootCause = new RuntimeException("Invalid UTF-8 sequence at byte 1024"); + IllegalArgumentException parseError = new IllegalArgumentException("Cannot parse JSON", rootCause); + TaskSerializationException exception = new TaskSerializationException( + "task-xyz", "Task deserialization failed", parseError); + + // Verify full chain for debugging + assertEquals("task-xyz", exception.getTaskId()); + assertSame(parseError, exception.getCause()); + assertSame(rootCause, exception.getCause().getCause()); + assertMessageContains(exception, "deserialization failed"); + } + + // ========== Edge Cases ========== + + @Test + void testEdgeCase_emptyJsonError() { + TaskSerializationException exception = new TaskSerializationException( + "task-empty", "Cannot deserialize empty JSON string"); + + assertEquals("task-empty", exception.getTaskId()); + assertMessageContains(exception, "empty JSON"); + } + + @Test + void testEdgeCase_circularReferenceError() { + TaskSerializationException exception = new TaskSerializationException( + "task-circular", + "Infinite recursion detected during serialization (StackOverflowError)"); + + assertEquals("task-circular", exception.getTaskId()); + assertMessageContains(exception, "Infinite recursion"); + } + + @Test + void testEdgeCase_characterEncodingError() { + RuntimeException encodingError = new RuntimeException("Invalid character encoding"); + TaskSerializationException exception = new TaskSerializationException( + "task-encoding", "Failed to encode task JSON", encodingError); + + assertFullContext(exception, "task-encoding", encodingError); + } + + // ========== Message Actionability Tests ========== + + @Test + void testMessageActionable_providesContext() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", + "Failed to deserialize task: field 'createdAt' expects ISO-8601 timestamp, got '2024-13-45'"); + + // Message should help developer understand the problem + assertMessageContains(exception, "field 'createdAt'"); + assertMessageContains(exception, "expects ISO-8601"); + assertMessageContains(exception, "got '2024-13-45'"); + } + + @Test + void testMessageActionable_suggestsResolution() { + TaskSerializationException exception = new TaskSerializationException( + "task-123", + "Schema version mismatch. Run database migration to update task format to v2.0"); + + assertMessageContains(exception, "Schema version"); + assertMessageContains(exception, "Run database migration"); + } + + // ========== Null Safety ========== + + @Test + void testNullSafety_nullTaskIdWithMessage() { + TaskSerializationException exception = new TaskSerializationException(null, "Generic serialization error"); + assertNull(exception.getTaskId()); + assertEquals("Generic serialization error", exception.getMessage()); + } + + @Test + void testNullSafety_nullMessageWithTaskId() { + TaskSerializationException exception = new TaskSerializationException("task-123", (String) null); + assertEquals("task-123", exception.getTaskId()); + assertNull(exception.getMessage()); + } +} diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskStoreExceptionTest.java b/server-common/src/test/java/io/a2a/server/tasks/TaskStoreExceptionTest.java new file mode 100644 index 000000000..351e4332f --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/tasks/TaskStoreExceptionTest.java @@ -0,0 +1,191 @@ +package io.a2a.server.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link TaskStoreException}. + *

    + * Tests the base exception class for TaskStore persistence layer failures, + * verifying all constructor variants and field behavior. + */ +class TaskStoreExceptionTest extends AbstractTaskStoreExceptionTest { + + @Override + protected TaskStoreException createException(String taskId, String message) { + return new TaskStoreException(taskId, message); + } + + @Override + protected TaskStoreException createException(String taskId, String message, Throwable cause) { + return new TaskStoreException(taskId, message, cause); + } + + // ========== Constructor Tests ========== + + @Test + void testConstructor_noArgs() { + TaskStoreException exception = new TaskStoreException(); + assertNull(exception.getMessage()); + assertNull(exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_messageOnly() { + TaskStoreException exception = new TaskStoreException("Failed to persist task"); + assertEquals("Failed to persist task", exception.getMessage()); + assertNull(exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_causeOnly() { + RuntimeException cause = new RuntimeException("Database error"); + TaskStoreException exception = new TaskStoreException(cause); + + assertNotNull(exception.getMessage()); + // Exception message should contain cause class name + assert exception.getMessage().contains("RuntimeException"); + assertSame(cause, exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_messageAndCause() { + RuntimeException cause = new RuntimeException("Database error"); + TaskStoreException exception = new TaskStoreException("Persistence failed", cause); + + assertEquals("Persistence failed", exception.getMessage()); + assertSame(cause, exception.getCause()); + assertNull(exception.getTaskId()); + } + + @Test + void testConstructor_taskIdAndMessage() { + TaskStoreException exception = new TaskStoreException("task-123", "Failed to save"); + + assertEquals("Failed to save", exception.getMessage()); + assertEquals("task-123", exception.getTaskId()); + assertNull(exception.getCause()); + } + + @Test + void testConstructor_taskIdMessageAndCause() { + RuntimeException cause = new RuntimeException("Connection timeout"); + TaskStoreException exception = new TaskStoreException("task-456", "Save operation failed", cause); + + assertEquals("Save operation failed", exception.getMessage()); + assertEquals("task-456", exception.getTaskId()); + assertSame(cause, exception.getCause()); + } + + // ========== Task ID Edge Cases ========== + + @Test + void testTaskId_uuid() { + String uuid = "550e8400-e29b-41d4-a716-446655440000"; + TaskStoreException exception = new TaskStoreException(uuid, "Test message"); + assertEquals(uuid, exception.getTaskId()); + } + + @Test + void testTaskId_numericString() { + TaskStoreException exception = new TaskStoreException("12345", "Test message"); + assertEquals("12345", exception.getTaskId()); + } + + @Test + void testTaskId_specialCharacters() { + String taskId = "task-123_v2.0"; + TaskStoreException exception = new TaskStoreException(taskId, "Test message"); + assertEquals(taskId, exception.getTaskId()); + } + + // ========== Inheritance Verification ========== + + @Test + void testInheritance_extendsA2AServerException() { + TaskStoreException exception = new TaskStoreException("test", "Test message"); + assertNotNull(exception); + // TaskStoreException extends A2AServerException (verified by compilation) + } + + @Test + void testInheritance_isException() { + TaskStoreException exception = new TaskStoreException("test", "Test message"); + Exception e = exception; + assertNotNull(e); + } + + // ========== Message Quality Tests ========== + + @Test + void testMessage_descriptive() { + TaskStoreException exception = new TaskStoreException( + "task-123", + "Failed to persist task to database: connection timeout after 30s"); + + assertNotNull(exception.getMessage()); + assertMessageContains(exception, "Failed to persist"); + assertMessageContains(exception, "database"); + assertMessageContains(exception, "connection timeout"); + } + + @Test + void testMessage_actionable() { + TaskStoreException exception = new TaskStoreException( + "task-123", + "Task not found in store. Verify taskId and retry operation."); + + assertNotNull(exception.getMessage()); + assertMessageContains(exception, "Task not found"); + assertMessageContains(exception, "Verify taskId"); + } + + // ========== Cause Chain Preservation ========== + + @Test + void testCausePreservation_multipleWrapping() { + RuntimeException rootCause = new RuntimeException("Disk full"); + IllegalStateException level1 = new IllegalStateException("Write failed", rootCause); + IllegalArgumentException level2 = new IllegalArgumentException("Validation failed", level1); + TaskStoreException exception = new TaskStoreException("task-789", "Complete failure", level2); + + // Verify full chain + assertEquals("Complete failure", exception.getMessage()); + assertEquals("task-789", exception.getTaskId()); + assertSame(level2, exception.getCause()); + assertSame(level1, exception.getCause().getCause()); + assertSame(rootCause, exception.getCause().getCause().getCause()); + } + + // ========== Null Safety Tests ========== + + @Test + void testNullSafety_nullMessage() { + TaskStoreException exception = new TaskStoreException("task-123", (String) null); + assertEquals("task-123", exception.getTaskId()); + assertNull(exception.getMessage()); + } + + @Test + void testNullSafety_nullCause() { + TaskStoreException exception = new TaskStoreException("task-123", "Message", (Throwable) null); + assertEquals("task-123", exception.getTaskId()); + assertEquals("Message", exception.getMessage()); + assertNull(exception.getCause()); + } + + @Test + void testNullSafety_allNulls() { + TaskStoreException exception = new TaskStoreException(null, (String) null, (Throwable) null); + assertNull(exception.getTaskId()); + assertNull(exception.getMessage()); + assertNull(exception.getCause()); + } +}