diff --git a/.gitignore b/.gitignore
index 3679c8f0c..441ef20e1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,3 +55,4 @@ nbproject/
.serena/
.bob/
claudedocs
+backlog/
diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java
index 086e640e3..3a704ee2a 100644
--- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java
+++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java
@@ -13,6 +13,7 @@
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
+import jakarta.persistence.PersistenceException;
import jakarta.persistence.TypedQuery;
import jakarta.transaction.Transactional;
@@ -22,6 +23,8 @@
import io.a2a.server.config.A2AConfigProvider;
import io.a2a.server.tasks.TaskStateProvider;
import io.a2a.server.tasks.TaskStore;
+import io.a2a.server.tasks.TaskSerializationException;
+import io.a2a.server.tasks.TaskPersistenceException;
import io.a2a.spec.Artifact;
import io.a2a.spec.ListTasksParams;
import io.a2a.spec.Message;
@@ -63,6 +66,7 @@ void initConfig() {
gracePeriodSeconds = Long.parseLong(configProvider.getValue(A2A_REPLICATION_GRACE_PERIOD_SECONDS));
}
+
@Transactional
@Override
public void save(Task task, boolean isReplicated) {
@@ -85,7 +89,12 @@ public void save(Task task, boolean isReplicated) {
}
} catch (JsonProcessingException e) {
LOGGER.error("Failed to serialize task with ID: {}", task.id(), e);
- throw new RuntimeException("Failed to serialize task with ID: " + task.id(), e);
+ throw new TaskSerializationException(task.id(),
+ "Failed to serialize task for persistence", e);
+ } catch (PersistenceException e) {
+ LOGGER.error("Database save failed for task with ID: {}", task.id(), e);
+ throw new TaskPersistenceException(task.id(),
+ "Database save failed for task", e);
}
}
@@ -93,19 +102,27 @@ public void save(Task task, boolean isReplicated) {
@Override
public Task get(String taskId) {
LOGGER.debug("Retrieving task with ID: {}", taskId);
- JpaTask jpaTask = em.find(JpaTask.class, taskId);
- if (jpaTask == null) {
- LOGGER.debug("Task not found with ID: {}", taskId);
- return null;
- }
-
try {
- Task task = jpaTask.getTask();
- LOGGER.debug("Successfully retrieved task with ID: {}", taskId);
- return task;
- } catch (JsonProcessingException e) {
- LOGGER.error("Failed to deserialize task with ID: {}", taskId, e);
- throw new RuntimeException("Failed to deserialize task with ID: " + taskId, e);
+ JpaTask jpaTask = em.find(JpaTask.class, taskId);
+ if (jpaTask == null) {
+ LOGGER.debug("Task not found with ID: {}", taskId);
+ return null;
+ }
+
+ try {
+ Task task = jpaTask.getTask();
+ LOGGER.debug("Successfully retrieved task with ID: {}", taskId);
+ return task;
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to deserialize task with ID: {}", taskId, e);
+ throw new TaskSerializationException(taskId,
+ "Failed to deserialize task from database", e);
+ }
+
+ } catch (PersistenceException e) {
+ LOGGER.error("Database retrieval failed for task with ID: {}", taskId, e);
+ throw new TaskPersistenceException(taskId,
+ "Database retrieval failed for task", e);
}
}
@@ -113,12 +130,18 @@ public Task get(String taskId) {
@Override
public void delete(String taskId) {
LOGGER.debug("Deleting task with ID: {}", taskId);
- JpaTask jpaTask = em.find(JpaTask.class, taskId);
- if (jpaTask != null) {
- em.remove(jpaTask);
- LOGGER.debug("Successfully deleted task with ID: {}", taskId);
- } else {
- LOGGER.debug("Task not found for deletion with ID: {}", taskId);
+ try {
+ JpaTask jpaTask = em.find(JpaTask.class, taskId);
+ if (jpaTask != null) {
+ em.remove(jpaTask);
+ LOGGER.debug("Successfully deleted task with ID: {}", taskId);
+ } else {
+ LOGGER.debug("Task not found for deletion with ID: {}", taskId);
+ }
+ } catch (PersistenceException e) {
+ LOGGER.error("Database deletion failed for task with ID: {}", taskId, e);
+ throw new TaskPersistenceException(taskId,
+ "Database deletion failed for task", e);
}
}
@@ -231,117 +254,126 @@ public ListTasksResult list(ListTasksParams params) {
LOGGER.debug("Listing tasks with params: contextId={}, status={}, pageSize={}, pageToken={}",
params.contextId(), params.status(), params.pageSize(), params.pageToken());
- // Parse pageToken once at the beginning
- PageToken pageToken = PageToken.fromString(params.pageToken());
- Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null;
- String tokenId = pageToken != null ? pageToken.id() : null;
-
- // Build dynamic JPQL query with WHERE clauses for filtering
- StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1");
- StringBuilder countQueryBuilder = new StringBuilder("SELECT COUNT(t) FROM JpaTask t WHERE 1=1");
-
- // Apply contextId filter using denormalized column
- if (params.contextId() != null) {
- queryBuilder.append(" AND t.contextId = :contextId");
- countQueryBuilder.append(" AND t.contextId = :contextId");
- }
+ try {
+ // Parse pageToken once at the beginning
+ PageToken pageToken = PageToken.fromString(params.pageToken());
+ Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null;
+ String tokenId = pageToken != null ? pageToken.id() : null;
+
+ // Build dynamic JPQL query with WHERE clauses for filtering
+ StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1");
+ StringBuilder countQueryBuilder = new StringBuilder("SELECT COUNT(t) FROM JpaTask t WHERE 1=1");
+
+ // Apply contextId filter using denormalized column
+ if (params.contextId() != null) {
+ queryBuilder.append(" AND t.contextId = :contextId");
+ countQueryBuilder.append(" AND t.contextId = :contextId");
+ }
- // Apply status filter using denormalized column
- if (params.status() != null) {
- queryBuilder.append(" AND t.state = :state");
- countQueryBuilder.append(" AND t.state = :state");
- }
+ // Apply status filter using denormalized column
+ if (params.status() != null) {
+ queryBuilder.append(" AND t.state = :state");
+ countQueryBuilder.append(" AND t.state = :state");
+ }
- // Apply statusTimestampAfter filter using denormalized timestamp column
- if (params.statusTimestampAfter() != null) {
- queryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter");
- countQueryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter");
- }
+ // Apply statusTimestampAfter filter using denormalized timestamp column
+ if (params.statusTimestampAfter() != null) {
+ queryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter");
+ countQueryBuilder.append(" AND t.statusTimestamp > :statusTimestampAfter");
+ }
- // Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC)
- if (tokenTimestamp != null) {
- // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
- queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))");
- }
+ // Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC)
+ if (tokenTimestamp != null) {
+ // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
+ queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))");
+ }
- // Sort by status timestamp descending (most recent first), then by ID for stable ordering
- queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC");
+ // Sort by status timestamp descending (most recent first), then by ID for stable ordering
+ queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC");
- // Create and configure the main query
- TypedQuery
Processing continues after errors - the failed event is distributed as InternalError + * to all ChildQueues, and the MainEventBusProcessor continues consuming subsequent events.
*/ @ApplicationScoped public class MainEventBusProcessor implements Runnable { @@ -293,11 +307,26 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", taskId, event.getClass().getSimpleName(), isFinal, isReplicated); return isFinal; + + } catch (TaskSerializationException e) { + // Data corruption or schema mismatch - ALWAYS permanent + LOGGER.error("Task {} event serialization failed - data corruption detected: {}", + taskId, e.getMessage(), e); + throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage()); + + } catch (TaskPersistenceException e) { + // Database/storage failure + LOGGER.error("Task {} event persistence failed: {}", taskId, e.getMessage(), e); + throw new InternalError("Storage failure for task " + taskId + ": " + e.getMessage()); + } catch (InternalError e) { + // Already an InternalError from TaskManager validation - pass through LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e); // Rethrow to prevent distributing unpersisted event to clients throw e; + } catch (Exception e) { + // Unexpected exception type - treat as permanent failure LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e); // Rethrow to prevent distributing unpersisted event to clients throw new InternalError("TaskStore persistence failed: " + e.getMessage()); diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 15f94d7e8..7c443b1cc 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -25,6 +25,62 @@ ** This is the default TaskStore used when no other implementation is provided. *
+ * + *{@code
+ * // Example: Delete finalized tasks older than 48 hours
+ * ListTasksParams params = new ListTasksParams.Builder()
+ * .statusTimestampBefore(Instant.now().minus(Duration.ofHours(48)))
+ * .build();
+ *
+ * List oldTasks = taskStore.list(params).tasks();
+ * oldTasks.stream()
+ * .filter(task -> task.status().state().isFinal())
+ * .forEach(task -> taskStore.delete(task.id()));
+ * }
+ *
+ * + * Indicates failures in the underlying storage system (database, filesystem, etc.) rather + * than data format issues. + * + *
{@code
+ * try {
+ * em.merge(jpaTask);
+ * } catch (PersistenceException e) {
+ * throw new TaskPersistenceException(taskId, "Database save failed", e);
+ * }
+ * }
+ *
+ * @see TaskStoreException
+ * @see TaskSerializationException for data format errors
+ */
+public class TaskPersistenceException extends TaskStoreException {
+
+ /**
+ * Creates a new TaskPersistenceException with no message or cause.
+ */
+ public TaskPersistenceException() {
+ super();
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified message.
+ *
+ * @param msg the exception message
+ */
+ public TaskPersistenceException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified cause.
+ *
+ * @param cause the underlying cause
+ */
+ public TaskPersistenceException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified message and cause.
+ *
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskPersistenceException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified task ID and message.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ */
+ public TaskPersistenceException(@Nullable final String taskId, final String msg) {
+ super(taskId, msg);
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified task ID, message, and cause.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause) {
+ super(taskId, msg, cause);
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
new file mode 100644
index 000000000..5916ce530
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
@@ -0,0 +1,87 @@
+package io.a2a.server.tasks;
+
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Exception for task serialization/deserialization failures.
+ * + * Indicates failures converting between Task domain objects and persistent storage format (JSON). + * + *
{@code
+ * try {
+ * Task task = jsonMapper.readValue(json, Task.class);
+ * } catch (JsonProcessingException e) {
+ * throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
+ * }
+ * }
+ *
+ * @see TaskStoreException
+ * @see TaskPersistenceException for database failures
+ */
+public class TaskSerializationException extends TaskStoreException {
+
+ /**
+ * Creates a new TaskSerializationException with no message or cause.
+ */
+ public TaskSerializationException() {
+ super();
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified message.
+ *
+ * @param msg the exception message
+ */
+ public TaskSerializationException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified cause.
+ *
+ * @param cause the underlying cause
+ */
+ public TaskSerializationException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified message and cause.
+ *
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskSerializationException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified task ID and message.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ */
+ public TaskSerializationException(@Nullable final String taskId, final String msg) {
+ super(taskId, msg);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified task ID, message, and cause.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskSerializationException(@Nullable final String taskId, final String msg, final Throwable cause) {
+ super(taskId, msg, cause);
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
index 3df903f77..7ae7a46bf 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
@@ -90,10 +90,64 @@
* {@code
+ * @Override
+ * public void save(Task task, boolean isReplicated) {
+ * try {
+ * String json = objectMapper.writeValueAsString(task);
+ * } catch (JsonProcessingException e) {
+ * throw new TaskSerializationException(task.id(), "Failed to serialize task", e);
+ * }
+ *
+ * try {
+ * entityManager.merge(toEntity(json));
+ * } catch (PersistenceException e) {
+ * throw new TaskPersistenceException(task.id(), "Database save failed", e);
+ * }
+ * }
+ * }
+ *
+ * + * Root exception for all task storage and retrieval errors. Specialized subclasses + * provide specific failure contexts: + *
+ * Tests verify that TaskStore persistence failures are converted to InternalError events + * and distributed to clients with appropriate logging based on failure type: + *
+ * Provides reusable test patterns for exception construction, field verification, + * and message formatting. Subclasses must implement {@link #createException} methods + * to test specific exception types. + *
+ * This class is designed to be extended by implementation tests (e.g., InMemoryTaskStore tests)
+ * to ensure consistent exception behavior across all TaskStore implementations.
+ *
+ * @param
+ * Tests the exception class for database/storage system failures.
+ */
+class TaskPersistenceExceptionTest extends AbstractTaskStoreExceptionTest
+ * Tests the exception class for task serialization/deserialization failures,
+ * verifying all constructor variants.
+ */
+class TaskSerializationExceptionTest extends AbstractTaskStoreExceptionTest
+ * Tests the base exception class for TaskStore persistence layer failures,
+ * verifying all constructor variants and field behavior.
+ */
+class TaskStoreExceptionTest extends AbstractTaskStoreExceptionTest