HDDS-13891. SCM-based health monitoring and batch processing in Recon#9258
HDDS-13891. SCM-based health monitoring and batch processing in Recon#9258devmadhuu wants to merge 24 commits intoapache:masterfrom
Conversation
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTaskV2.java
Outdated
Show resolved
Hide resolved
...hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
Outdated
Show resolved
Hide resolved
…to run the replication manager logic in Recon itself.
…to run the replication manager logic in Recon itself.
|
Thank you for your contribution. This PR is being closed due to inactivity. If needed, feel free to reopen it. |
| new HashMap<>(); | ||
|
|
||
| // Captures containers with REPLICA_MISMATCH (Recon-specific, not in SCM's HealthState) | ||
| private final List<ContainerID> replicaMismatchContainers = new ArrayList<>(); |
There was a problem hiding this comment.
we can just set sampleLimit to 0 for recon
…ng llimit in recon to 0 and some other review comments.
| .build(); | ||
|
|
||
| try { | ||
| testCluster.waitForClusterToBeReady(); |
There was a problem hiding this comment.
this should be either done in BeforeEach/BeforeAll or moved in the helper function.
initializing mini cluster in BeforeAll and reusing it across all tests would significantly reduce test runtime
|
|
||
| // The actual over-replication detection would look like this: | ||
| // LambdaTestUtils.await(120000, 6000, () -> { | ||
| // List<UnhealthyContainerRecordV2> overReplicatedContainers = |
| containerHealthSchemaManagerV2.getUnhealthyContainers(v2State, minContainerId, maxContainerId, limit); | ||
|
|
||
| // Convert V2 records to response format | ||
| for (ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2 c : v2Containers) { |
There was a problem hiding this comment.
its better to move the transformation logic into helper function and then do map().collect()
| initializeAndRunTask(); | ||
|
|
||
| // Wait before next run using configured interval | ||
| synchronized (this) { |
|
|
||
| // Wait before next run using configured interval | ||
| synchronized (this) { | ||
| wait(interval); |
There was a problem hiding this comment.
typically if constant interval between tasks is needed, the wait time should be "interval - task_runtime_time"
| // 3. Stores all health states in database | ||
| reconRM.processAll(); | ||
|
|
||
| LOG.info("ContainerHealthTaskV2 completed successfully"); |
There was a problem hiding this comment.
should also expose 'runtime' metric and log end-start time
| /** | ||
| * Builder for creating {@link InitContext} instances. | ||
| */ | ||
| public static final class Builder { |
| * </ul> | ||
| */ | ||
| @Override | ||
| public synchronized void processAll() { |
| public synchronized void processAll() { | ||
| LOG.info("ReconReplicationManager starting container health check"); | ||
|
|
||
| final long startTime = System.currentTimeMillis(); |
| ReconReplicationManagerReport report, | ||
| List<ContainerInfo> allContainers) { | ||
|
|
||
| long currentTime = System.currentTimeMillis(); |
| * @param report The report with all captured container health states | ||
| * @param allContainers List of all containers for cleanup | ||
| */ | ||
| private void storeHealthStatesToDatabase( |
There was a problem hiding this comment.
it would be better to split functions into smaller parts.
|
|
||
| for (int from = 0; from < recs.size(); from += BATCH_INSERT_CHUNK_SIZE) { | ||
| int to = Math.min(from + BATCH_INSERT_CHUNK_SIZE, recs.size()); | ||
| List<UnhealthyContainersV2Record> records = new ArrayList<>(to - from); |
There was a problem hiding this comment.
do we need a new list inside the loop?
| LOG.debug("Batch inserted {} unhealthy container records", recs.size()); | ||
|
|
||
| } catch (DataAccessException e) { | ||
| // Batch insert failed (likely duplicate key) - fall back to insert-or-update per record |
There was a problem hiding this comment.
should be moved into separate function.
| List<UnhealthyContainersV2Record> records = new ArrayList<>(to - from); | ||
| for (int i = from; i < to; i++) { | ||
| UnhealthyContainerRecordV2 rec = recs.get(i); | ||
| UnhealthyContainersV2Record record = txContext.newRecord(UNHEALTHY_CONTAINERS_V2); |
There was a problem hiding this comment.
can all the params to be passed into ctor?
| /** | ||
| * POJO representing a record in UNHEALTHY_CONTAINERS_V2 table. | ||
| */ | ||
| public static class UnhealthyContainerRecordV2 { |
| /** | ||
| * POJO representing a summary record for unhealthy containers. | ||
| */ | ||
| public static class UnhealthyContainersSummaryV2 { |
What changes were proposed in this pull request?
This PR Implements
ContainerHealthTaskV2by extending SCM's ReplicationManager for use in Recon. This approach evaluates container health locally using SCM's proven health check logic without requiring network communication between SCM and Recon.Implementation Approach
Introduces ContainerHealthTaskV2, a new implementation that determines container health states by:
ReplicationManagerasReconReplicationManagerprocessAll()to evaluate all containers using SCM's proven health check logicUNHEALTHY_CONTAINERS_V2tableKey Improvements Over Legacy ContainerHealthTask
ContainerHealthTaskV2 provides significant improvements over the original ContainerHealthTask (V1):
1. Accuracy & Completeness
2. Performance
3. Maintainability
4. Database Schema
5. Benefits Summary
Container Health States Detected
ContainerHealthTaskV2 detects 5 distinct health states:
SCM Health States (Inherited)
Recon-Specific Health State
Implementation: ReconReplicationManager first runs SCM's health checks, then additionally checks for REPLICA_MISMATCH by comparing checksums across replicas. This ensures both replication health and data integrity are monitored.
Testing
UNHEALTHY_CONTAINERS_V2tableDatabase Schema
Uses existing
UNHEALTHY_CONTAINERS_V2table with support for all 5 health states:Each record includes:
Testing
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-13891
How was this patch tested?
Added junit test cases and tested using local docker cluster.