Skip to content

feat: Add file_converter extension module (Issue #54)#56

Open
vaibhav45sktech wants to merge 6 commits intodbpedia:mainfrom
vaibhav45sktech:feature/file-converter-extension
Open

feat: Add file_converter extension module (Issue #54)#56
vaibhav45sktech wants to merge 6 commits intodbpedia:mainfrom
vaibhav45sktech:feature/file-converter-extension

Conversation

@vaibhav45sktech
Copy link
Contributor

@vaibhav45sktech vaibhav45sktech commented Feb 16, 2026

Description

This PR adds a new file_converter.py extension module to address Issue #54. The module provides a streaming pipeline for file format conversion with support for gzip decompression and checksum validation.

Changes

  • Create new databusclient/extensions/file_converter.py module
  • Implement FileConverter class with streaming support
  • Add decompress_gzip_stream() method with optional checksum validation
  • Add compress_gzip_stream() method for gzip compression
  • Add validate_checksum_stream() method for SHA256 checksum validation

Related Issues
Fixes #54

Type of change

  • New feature (non-breaking change which adds functionality)

Summary by CodeRabbit

  • New Features

    • Multi-format streaming conversion: gz, bz2, xz (and optional zstd), plus a "none" uncompressed option.
    • Streaming compress/decompress and format detection utilities; filename conversion handling.
    • CLI: new --decompress flag and convert-to now accepts "none".
    • Optional checksum validation during streaming operations.
  • Tests

    • Comprehensive tests covering detection, streaming conversion, checksum validation, CLI behavior, and optional zstd.

- Create new file_converter.py extension module in databusclient/extensions/
- Implements FileConverter class with streaming pipeline support
- Supports gzip decompression with optional checksum validation
- Provides methods for compress_gzip_stream and decompress_gzip_stream
- Minimal version as suggested in issue to start with gzip + checksum
- Can be extended later to support more compression formats
@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

📝 Walkthrough

Walkthrough

Adds a new streaming FileConverter extension (multi-format compress/decompress, detect, checksum, convert utilities) and integrates it into the download CLI and API flow; adds tests for conversion and updates package exports to expose FileConverter.

Changes

Cohort / File(s) Summary
FileConverter extension
databusclient/extensions/file_converter.py
New comprehensive streaming file conversion module: format detection (by name and magic), gzip/bz2/xz streaming compress/decompress, optional zstd support, checksum validation, convert_stream/convert_file helpers, and filename utilities.
Extensions package exports
databusclient/extensions/__init__.py
Re-exports FileConverter, COMPRESSION_EXTENSIONS, COMPRESSION_MODULES, and MAGIC_NUMBERS from the new module; updates __all__.
Download logic refactor
databusclient/api/download.py
Rewrote download internals to use FileConverter: detection, streaming conversion paths, checksum extraction/validation, vault token exchange for certain hosts, and reorganized SPARQL and URL handling. Watch conversion branching (streaming vs non-streaming) and auth/token exchange paths.
CLI changes
databusclient/cli.py
Added --decompress flag and allowed convert-to value none; enforces mutual exclusion and maps --decompress to convert_to='none'. Update help text and option handling.
Compression-related tests updated
tests/test_compression_conversion.py
Adjusted tests to expect "none" for no-extension detection and updated related assertions.
FileConverter tests
tests/test_file_converter.py
New extensive test suite covering detection, streaming compress/decompress, conversions, checksum validation, filename handling, error/cleanup paths, CLI decompress behavior, and optional zstd tests.

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI
    participant Download as Download API
    participant Vault as Vault/Auth
    participant Remote as Remote Server (artifact)
    participant Converter as FileConverter
    participant FS as Local filesystem

    CLI->>Download: request download (convert_to / decompress)
    Download->>Vault: request token exchange (if required)
    Vault-->>Download: bearer token
    Download->>Remote: GET artifact (with token if any) [stream]
    Remote-->>Download: streamed bytes
    Download->>Converter: stream bytes + source_format,target_format,validate_checksum
    Converter-->>Download: converted stream (on-the-fly) / checksum result
    Download->>FS: write final file (or temp + cleanup)
    FS-->>CLI: success / error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 1 | ❌ 4

❌ Failed checks (4 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete; it lacks key checklist items (code review, documentation, testing, linting verification) and provides minimal detail of actual implemented features beyond the stated objectives. Complete the template checklist by explicitly marking items as done/not applicable and add details on testing and code quality verification performed.
Linked Issues check ⚠️ Warning The PR partially addresses Issue #54's requirements but significantly diverges from the primary objective of integrating conversion into the download pipeline as the reviewer noted. Integrate the FileConverter class into databusclient/api/download.py to enable on-the-fly streaming conversion during downloads, fulfilling the core requirement of improving performance via combined download-conversion steps.
Out of Scope Changes check ⚠️ Warning The PR introduces changes beyond the stated PR description (bz2, xz, zstd support, download.py refactoring, CLI enhancements) that were not mentioned in the initial description. Either document these additional features in the PR description or clarify their necessity relative to Issue #54's original scope of gzip-focused minimal implementation.
Docstring Coverage ⚠️ Warning Docstring coverage is 57.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main addition: a new file_converter extension module addressing Issue #54.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@databusclient/extensions/file_converter.py`:
- Around line 38-50: The code creates source_hasher from expected_checksum but
never uses it, so remove the unused expected_checksum parameter and the
source_hasher/hash-from-compressed-stream logic from the FileConverter method
that reads gzip (the block using gzip.open, FileConverter.CHUNK_SIZE, hasher,
and output_stream), keep only validate_checksum-driven hasher for decompressed
chunks, and update the method signature and return value accordingly; then
update callers to perform checksum validation on the compressed input stream
(e.g., via validate_checksum_stream) before calling this decompression routine.
🧹 Nitpick comments (1)
databusclient/extensions/file_converter.py (1)

70-103: seek(0) assumes a seekable stream; return type is misleading.

Two concerns:

  1. Line 88: input_stream.seek(0) will raise on non-seekable streams (e.g., network response bodies, pipes). Since the PR objective targets integration with the download pipeline, callers will need to be aware of this constraint. Consider either documenting the seekable requirement, removing the seek(0) call (let the caller manage stream position), or accepting a non-seekable stream and removing the seek.

  2. Return type: The method signature says -> bool but it never returns False — it either returns True or raises IOError. Consider returning bool without raising (let the caller decide), or changing the return type to None and only raising on failure. Pick one contract and be consistent.

Suggested approach: remove seek, return bool without raising
     `@staticmethod`
     def validate_checksum_stream(
         input_stream: BinaryIO,
         expected_checksum: str
     ) -> bool:
         """Validate SHA256 checksum of a stream.
 
         Args:
-            input_stream: Input stream to validate
+            input_stream: Input stream to validate (must be positioned at start)
             expected_checksum: Expected SHA256 checksum
 
         Returns:
-            True if checksum matches
+            True if checksum matches, False otherwise
-
-        Raises:
-            IOError: If checksum validation fails
         """
         hasher = hashlib.sha256()
-        input_stream.seek(0)
-        
+
         while True:
             chunk = input_stream.read(FileConverter.CHUNK_SIZE)
             if not chunk:
                 break
             hasher.update(chunk)
-        
+
         computed = hasher.hexdigest()
-        if computed.lower() != expected_checksum.lower():
-            raise IOError(
-                f"Checksum mismatch: expected {expected_checksum}, "
-                f"got {computed}"
-            )
-        
-        return True
+        return computed.lower() == expected_checksum.lower()

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
databusclient/extensions/file_converter.py (3)

102-105: IOError is semantically incorrect for a checksum mismatch; consider ValueError.

IOError (aliased to OSError in Python 3) conventionally signals operating-system-level I/O failures (file not found, disk full, permission denied). A checksum mismatch is a data-integrity error — ValueError or a custom ChecksumMismatchError would let callers distinguish between a genuine I/O failure and bad data without catching all OSErrors.

♻️ Proposed change
-            raise IOError(
+            raise ValueError(
                 f"Checksum mismatch: expected {expected_checksum}, "
                 f"got {computed}"
             )

Also update the docstring:

-        Raises:
-            IOError: If checksum validation fails
+        Raises:
+            ValueError: If checksum validation fails
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/extensions/file_converter.py` around lines 102 - 105, Replace
the semantically incorrect IOError raised for checksum mismatches with a more
appropriate exception: change the raise in file_converter.py (the
checksum-checking block that currently raises IOError("Checksum mismatch:
expected..., got ...")) to raise ValueError with the same message, or define and
raise a custom ChecksumMismatchError class and use that instead; also update the
surrounding function/class docstring (the docstring for the checksum
verification routine in file_converter.py) to document the new exception type so
callers know to catch ValueError or ChecksumMismatchError.

21-21: validate_checksum parameter name should be compute_checksum.

The parameter only computes and returns the digest — it performs no comparison. The docstring correctly describes the behavior as "compute", but the parameter name implies validation. The past review had proposed this rename; it wasn't carried through.

♻️ Proposed rename
     def decompress_gzip_stream(
         input_stream: BinaryIO,
         output_stream: BinaryIO,
-        validate_checksum: bool = False,
+        compute_checksum: bool = False,
     ) -> Optional[str]:
         """Decompress gzip stream with optional checksum computation.
         ...
             validate_checksum: Whether to compute a SHA-256 checksum of
+            compute_checksum: Whether to compute a SHA-256 checksum of
                 the decompressed output.
         ...
         """
-        hasher = hashlib.sha256() if validate_checksum else None
+        hasher = hashlib.sha256() if compute_checksum else None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/extensions/file_converter.py` at line 21, Rename the parameter
validate_checksum to compute_checksum throughout the file to match behaviour
described in the docstring: update the function/method signature(s) that
currently declare validate_checksum (and any default value False) to
compute_checksum: bool = False, update all internal variable references and any
return/tuple keys or comments that use validate_checksum to compute_checksum,
and update any call sites inside databusclient/extensions/file_converter.py (and
its unit tests if present) so callers use the new name; ensure type hints,
docstring example/parameter list, and any logging/messages reflect the new name.

12-107: No tests provided for the new module.

The PR adds a non-trivial streaming pipeline but no unit tests. At minimum, these cases should be covered:

  • Round-trip: compress → decompress restores original bytes.
  • validate_checksum_stream passes on a correct hash and raises on a bad hash.
  • decompress_gzip_stream with validate_checksum=True returns the correct hex digest.
  • Non-seekable stream handling for validate_checksum_stream.

Would you like me to generate a tests/test_file_converter.py skeleton covering the above cases, or open a follow-up issue to track this?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/extensions/file_converter.py` around lines 12 - 107, Add unit
tests for the FileConverter class to cover the streaming pipeline: create
tests/test_file_converter.py and include (1) a round-trip test that writes
random bytes, uses FileConverter.compress_gzip_stream to compress into a buffer
and then FileConverter.decompress_gzip_stream to decompress and assert original
bytes are restored; (2) tests for validate_checksum_stream that assert True on a
correct SHA-256 and that an IOError is raised on a bad hash; (3) a test that
calls FileConverter.decompress_gzip_stream with validate_checksum=True and
asserts the returned hex digest equals the SHA-256 of the decompressed bytes;
and (4) a test for non-seekable input to validate_checksum_stream using a custom
non-seekable wrapper (or io.BufferedReader over a pipe-like object) to ensure
validation still works without calling seek; use BinaryIO-compatible buffers
(io.BytesIO) and reference FileConverter.compress_gzip_stream,
FileConverter.decompress_gzip_stream, and FileConverter.validate_checksum_stream
in assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@databusclient/extensions/file_converter.py`:
- Around line 91-107: validate_checksum_stream currently reads to EOF and
returns without resetting the stream, which breaks the validate-then-decompress
pattern used by decompress_gzip_stream; change validate_checksum_stream to seek
input_stream back to 0 before returning and update its docstring to state that
input_stream must be seekable (and that the function will reset the stream
position), so callers like FileConverter.decompress_gzip_stream can safely read
from the start after validation.
- Around line 12-13: The package API currently doesn't export FileConverter from
the extensions package, forcing consumers to import
databusclient.extensions.file_converter.FileConverter directly; update
extensions/__init__.py to import and export FileConverter so it can be accessed
as databusclient.extensions.FileConverter (e.g., add "from .file_converter
import FileConverter" and include it in __all__), or document that only direct
module imports are supported—modify the __init__ export to reference the
FileConverter class name to restore the expected package-level import.

---

Nitpick comments:
In `@databusclient/extensions/file_converter.py`:
- Around line 102-105: Replace the semantically incorrect IOError raised for
checksum mismatches with a more appropriate exception: change the raise in
file_converter.py (the checksum-checking block that currently raises
IOError("Checksum mismatch: expected..., got ...")) to raise ValueError with the
same message, or define and raise a custom ChecksumMismatchError class and use
that instead; also update the surrounding function/class docstring (the
docstring for the checksum verification routine in file_converter.py) to
document the new exception type so callers know to catch ValueError or
ChecksumMismatchError.
- Line 21: Rename the parameter validate_checksum to compute_checksum throughout
the file to match behaviour described in the docstring: update the
function/method signature(s) that currently declare validate_checksum (and any
default value False) to compute_checksum: bool = False, update all internal
variable references and any return/tuple keys or comments that use
validate_checksum to compute_checksum, and update any call sites inside
databusclient/extensions/file_converter.py (and its unit tests if present) so
callers use the new name; ensure type hints, docstring example/parameter list,
and any logging/messages reflect the new name.
- Around line 12-107: Add unit tests for the FileConverter class to cover the
streaming pipeline: create tests/test_file_converter.py and include (1) a
round-trip test that writes random bytes, uses
FileConverter.compress_gzip_stream to compress into a buffer and then
FileConverter.decompress_gzip_stream to decompress and assert original bytes are
restored; (2) tests for validate_checksum_stream that assert True on a correct
SHA-256 and that an IOError is raised on a bad hash; (3) a test that calls
FileConverter.decompress_gzip_stream with validate_checksum=True and asserts the
returned hex digest equals the SHA-256 of the decompressed bytes; and (4) a test
for non-seekable input to validate_checksum_stream using a custom non-seekable
wrapper (or io.BufferedReader over a pipe-like object) to ensure validation
still works without calling seek; use BinaryIO-compatible buffers (io.BytesIO)
and reference FileConverter.compress_gzip_stream,
FileConverter.decompress_gzip_stream, and FileConverter.validate_checksum_stream
in assertions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@databusclient/extensions/file_converter.py`:
- Around line 102-109: The input_stream is only seeked back on success so when
the checksum check raises an IOError the stream stays at EOF; move the
input_stream.seek(0) to before the comparison of computed vs expected_checksum
(i.e., seek the input_stream right after computing computed = hasher.hexdigest()
and before comparing computed.lower() != expected_checksum.lower()) so the
stream is rewound regardless of an exception, and update the function docstring
to drop the word "successful" (so it documents that the stream will be rewound
even when a checksum mismatch occurs).

---

Duplicate comments:
In `@databusclient/extensions/file_converter.py`:
- Line 21: The parameter name validate_checksum is misleading because the
function only computes and returns a digest; rename the parameter to
compute_checksum across the FileConverter API in
databusclient/extensions/file_converter.py (update the function signature where
validate_checksum appears, any internal uses/branches that check it, and all
docstrings that reference "validate_checksum") and update all callers/tests to
use compute_checksum so the name matches behavior and the docstrings that
already say "compute" remain consistent.

@vaibhav45sktech
Copy link
Contributor Author

Greetings @Integer-Ctrl,Could you please review this pr whenever available . I might be mistaken in some of the changes, so your guidance and feedback would be very helpful. Thank you!

@Integer-Ctrl
Copy link
Contributor

Hi @vaibhav45sktech,

This PR misses the actual task.

You have introduced a class that can be used to convert compressions. But this logic is not built into the download pipeline anywhere, or have I overlooked it? In that case, it is code that would have no use.

However, the base conversion and checksum validation are already implemented, see

# --- 6. Validate checksum on original downloaded file (BEFORE conversion) ---
if validate_checksum:
# reuse compute_sha256_and_length from webdav extension
try:
actual, _ = compute_sha256_and_length(filename)
except (OSError, IOError) as e:
print(f"WARNING: error computing checksum for {filename}: {e}")
actual = None
if expected_checksum is None:
print(f"WARNING: no expected checksum available for {filename}; skipping validation")
elif actual is None:
print(f"WARNING: could not compute checksum for {filename}; skipping validation")
else:
if actual.lower() != expected_checksum.lower():
try:
os.remove(filename) # delete corrupted file
except OSError:
pass
raise IOError(
f"Checksum mismatch for {filename}: expected {expected_checksum}, got {actual}"
)
# --- 7. Convert compression format if requested (AFTER validation) ---
should_convert, source_format = _should_convert_file(file, convert_to, convert_from)
if should_convert and source_format:
target_filename = _get_converted_filename(file, source_format, convert_to)
target_filepath = os.path.join(localDir, target_filename)
_convert_compression_format(filename, target_filepath, source_format, convert_to)

The ticket #54 only contains ideas for improving the file conversion.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (3)
databusclient/api/download.py (2)

164-166: Missing space in return{} (Line 166).

Minor style nit, but consistent with PEP 8.

Proposed fix
-        return{}
+        return {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/api/download.py` around lines 164 - 166, Fix the PEP8 spacing
issue in databusclient/api/download.py by replacing the tight brace in the empty
dict return with a space: change the `return{}` statement (in the block where
`graph = jd` is set) to `return {}` so it conforms to the spacing convention.

366-419: "Streaming" path downloads the entire file to disk first, then converts.

The comment says "Streaming download + conversion in a single pass" (Line 380), but the implementation writes the full compressed file to disk (Line 389-394), validates the checksum, then calls _convert_compression_format which reads and re-writes. This is download-then-convert, not single-pass streaming. The approach is reasonable for checksum-on-compressed-bytes, but the comment is misleading.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/api/download.py` around lines 366 - 419, The comment claiming
"Streaming download + conversion in a single pass" is incorrect because the code
(using response.iter_content to write to filename, computing checksum_hasher,
then calling _convert_compression_format) downloads the full compressed file to
disk and only then converts it; update the comment to accurately state that the
implementation downloads the compressed object to a temp file, validates
checksum (when validate_checksum and checksum_hasher are used), and then
performs conversion via _convert_compression_format (or alternatively implement
true streaming conversion that decompresses on the fly while preserving checksum
of compressed bytes if you prefer that behavior); reference symbols:
response.iter_content, checksum_hasher, validate_checksum, filename,
_convert_compression_format, _get_converted_filename, and _should_convert_file.
databusclient/extensions/file_converter.py (1)

72-100: Docstring says magic "takes precedence if the extension is ambiguous" but extension always wins when matched.

Extension is checked first and returned immediately (Line 92). Magic bytes are only consulted when no extension matches. The docstring implies magic could override an extension match. Minor wording nit — consider: "Falls back to magic-number detection when no known extension is found."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/extensions/file_converter.py` around lines 72 - 100, The
docstring for FileConverter.detect_format inaccurately states magic-number
detection "takes precedence if the extension is ambiguous" while the
implementation always returns on an extension match; update the docstring to
reflect the actual behavior (extension-based detection first, magic-number
detection used as a fallback when no known extension is found) or alternatively
change the implementation to consult FileConverter.detect_format_by_magic before
honoring an extension; reference the detect_format, detect_format_by_magic
methods and COMPRESSION_EXTENSIONS to locate the logic to update.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@databusclient/api/download.py`:
- Around line 17-21: The import list in databusclient/api/download.py includes
unused symbols COMPRESSION_EXTENSIONS and COMPRESSION_MODULES which cause a Ruff
F401 failure; update the import from databusclient.extensions.file_converter to
only import FileConverter (remove COMPRESSION_EXTENSIONS and
COMPRESSION_MODULES) or, if those constants are needed later, reference them
where used or mark as used, ensuring the FileConverter import remains intact
(look for the import statement that currently lists FileConverter,
COMPRESSION_EXTENSIONS, COMPRESSION_MODULES).
- Line 1068: The print statement currently prints a literal "{}" instead of
interpolating databusURI; replace the incorrect print("QUERY {}",
databusURI.replace("\n", " ")) with a proper interpolated string (e.g., using an
f-string or .format) so the cleaned databusURI value is included in the log;
update the print call near the databusURI usage in databusclient/api/download.py
to use f"QUERY {databusURI.replace('\\n', ' ')}" (or equivalent) so the URI is
shown correctly.
- Line 95: The file mixes PEP 604 union syntax (str | None) and typing.Optional;
standardize to typing.Optional[str] for consistency by replacing all occurrences
of "str | None" with "Optional[str]" (including the function
_extract_checksum_from_node and the other annotated functions noted), and ensure
"from typing import Optional" is present in the top-level imports; update any
type hints like "int | None" similarly if present to keep a uniform style.

In `@databusclient/extensions/file_converter.py`:
- Around line 307-321: The try block in convert_file currently covers both the
read/write loop and os.remove(source_path), so if removing the source fails you
end up treating it as a conversion failure and deleting the target; to fix,
limit the try/except to only the conversion (the with _open_reader/_open_writer
and read/write loop) and move os.remove(source_path) outside that try (or into a
separate post-success block); keep the except behavior that removes target_path
if a conversion exception occurred, but do not remove target_path when source
deletion fails — instead handle source removal failure separately (log or raise
a distinct error) so successful outputs aren't deleted; refer to convert_file,
_open_reader, _open_writer, and FileConverter.CHUNK_SIZE to locate the code to
change.
- Around line 455-462: The zstd wrappers in _wrap_reader and _wrap_writer
currently call _zstd.ZstdDecompressor().stream_reader(...) and
_zstd.ZstdCompressor().stream_writer(...) with the default closefd=True which
closes caller-owned streams: update the calls in _wrap_reader (stream_reader)
and _wrap_writer (stream_writer) to pass closefd=False so the wrapper won't
close the underlying BinaryIO owned by the caller; keep existing behavior
elsewhere unchanged.

In `@tests/test_file_converter.py`:
- Around line 24-34: The test file imports several names that are never used
causing Ruff F401; remove the unused imports COMPRESSION_EXTENSIONS and
COMPRESSION_MODULES from databusclient.extensions.file_converter and remove
_detect_compression_format, _get_converted_filename, and
_convert_compression_format from databusclient.api.download so only
FileConverter and _should_convert_file (or whatever names the tests actually
use) remain imported, or alternatively reference those imported symbols in the
tests if they are intended to be tested; update the import block in
tests/test_file_converter.py to import only the symbols actually used (e.g.,
FileConverter and _should_convert_file) to resolve the linter error.

---

Nitpick comments:
In `@databusclient/api/download.py`:
- Around line 164-166: Fix the PEP8 spacing issue in
databusclient/api/download.py by replacing the tight brace in the empty dict
return with a space: change the `return{}` statement (in the block where `graph
= jd` is set) to `return {}` so it conforms to the spacing convention.
- Around line 366-419: The comment claiming "Streaming download + conversion in
a single pass" is incorrect because the code (using response.iter_content to
write to filename, computing checksum_hasher, then calling
_convert_compression_format) downloads the full compressed file to disk and only
then converts it; update the comment to accurately state that the implementation
downloads the compressed object to a temp file, validates checksum (when
validate_checksum and checksum_hasher are used), and then performs conversion
via _convert_compression_format (or alternatively implement true streaming
conversion that decompresses on the fly while preserving checksum of compressed
bytes if you prefer that behavior); reference symbols: response.iter_content,
checksum_hasher, validate_checksum, filename, _convert_compression_format,
_get_converted_filename, and _should_convert_file.

In `@databusclient/extensions/file_converter.py`:
- Around line 72-100: The docstring for FileConverter.detect_format inaccurately
states magic-number detection "takes precedence if the extension is ambiguous"
while the implementation always returns on an extension match; update the
docstring to reflect the actual behavior (extension-based detection first,
magic-number detection used as a fallback when no known extension is found) or
alternatively change the implementation to consult
FileConverter.detect_format_by_magic before honoring an extension; reference the
detect_format, detect_format_by_magic methods and COMPRESSION_EXTENSIONS to
locate the logic to update.

Comment on lines +17 to +21
from databusclient.extensions.file_converter import (
FileConverter,
COMPRESSION_EXTENSIONS,
COMPRESSION_MODULES,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Unused imports COMPRESSION_EXTENSIONS and COMPRESSION_MODULES cause pipeline failure.

Ruff F401 confirms these are imported but never used in this file. The pipeline is currently failing because of this.

🐛 Proposed fix
 from databusclient.extensions.file_converter import (
     FileConverter,
-    COMPRESSION_EXTENSIONS,
-    COMPRESSION_MODULES,
 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from databusclient.extensions.file_converter import (
FileConverter,
COMPRESSION_EXTENSIONS,
COMPRESSION_MODULES,
)
from databusclient.extensions.file_converter import (
FileConverter,
)
🧰 Tools
🪛 GitHub Actions: Python CI (Lint & pytest)

[error] 19-19: F401 'databusclient.extensions.file_converter.COMPRESSION_EXTENSIONS' imported but unused

🪛 GitHub Check: build

[failure] 20-20: Ruff (F401)
databusclient/api/download.py:20:5: F401 databusclient.extensions.file_converter.COMPRESSION_MODULES imported but unused


[failure] 19-19: Ruff (F401)
databusclient/api/download.py:19:5: F401 databusclient.extensions.file_converter.COMPRESSION_EXTENSIONS imported but unused

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/api/download.py` around lines 17 - 21, The import list in
databusclient/api/download.py includes unused symbols COMPRESSION_EXTENSIONS and
COMPRESSION_MODULES which cause a Ruff F401 failure; update the import from
databusclient.extensions.file_converter to only import FileConverter (remove
COMPRESSION_EXTENSIONS and COMPRESSION_MODULES) or, if those constants are
needed later, reference them where used or mark as used, ensuring the
FileConverter import remains intact (look for the import statement that
currently lists FileConverter, COMPRESSION_EXTENSIONS, COMPRESSION_MODULES).

# compiled regex for SHA-256 hex strings
_SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$")

def _extract_checksum_from_node(node) -> str | None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check the project's minimum Python version
echo "=== setup.py / setup.cfg / pyproject.toml ==="
for f in setup.py setup.cfg pyproject.toml; do
  if [ -f "$f" ]; then
    echo "--- $f ---"
    cat "$f"
  fi
done

echo "=== python_requires ==="
rg -n 'python_requires|python_version|target-version' setup.py setup.cfg pyproject.toml 2>/dev/null || echo "Not found"

Repository: dbpedia/databus-python-client

Length of output: 919


🏁 Script executed:

# Examine the download.py file to verify union syntax usage
echo "=== Check lines 95, 183, 467, 788 in databusclient/api/download.py ==="
if [ -f "databusclient/api/download.py" ]; then
  wc -l databusclient/api/download.py
  echo "--- Line 95 ---"
  sed -n '93,97p' databusclient/api/download.py
  echo "--- Line 183 ---"
  sed -n '181,185p' databusclient/api/download.py
  echo "--- Line 467 ---"
  sed -n '465,469p' databusclient/api/download.py
  echo "--- Line 788 ---"
  sed -n '786,790p' databusclient/api/download.py
else
  echo "File not found"
fi

Repository: dbpedia/databus-python-client

Length of output: 955


🏁 Script executed:

# Check for __future__ imports and overall union type usage
echo "=== Checking __future__ imports and union syntax in download.py ==="
if [ -f "databusclient/api/download.py" ]; then
  echo "--- __future__ imports ---"
  head -20 databusclient/api/download.py | rg -n "from __future__"
  
  echo "--- Count of X | Y syntax (union types) ---"
  rg -n '\s\|\s' databusclient/api/download.py | head -20
  
  echo "--- Count of Optional usage ---"
  rg -n 'Optional\[' databusclient/api/download.py | head -20
fi

Repository: dbpedia/databus-python-client

Length of output: 780


Standardize union type syntax for consistency.

The code mixes str | None syntax (lines 95, 183, 226, 467, 501, 788) with Optional[str] syntax (lines 24, 38-39), which is inconsistent. While both are valid for Python 3.11+ (the project's minimum version per pyproject.toml), use one style uniformly throughout the file for readability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/api/download.py` at line 95, The file mixes PEP 604 union
syntax (str | None) and typing.Optional; standardize to typing.Optional[str] for
consistency by replacing all occurrences of "str | None" with "Optional[str]"
(including the function _extract_checksum_from_node and the other annotated
functions noted), and ensure "from typing import Optional" is present in the
top-level imports; update any type hints like "int | None" similarly if present
to keep a uniform style.

print("query in file not supported yet")
# query as argument
else:
print("QUERY {}", databusURI.replace("\n", " "))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Literal {} printed in log output — likely a .format() remnant.

print("QUERY {}", databusURI.replace("\n", " ")) prints the literal string {} because print treats its arguments as separate positional values (separated by space). This should be an f-string.

🐛 Proposed fix
-            print("QUERY {}", databusURI.replace("\n", " "))
+            print(f"QUERY {databusURI.replace(chr(10), ' ')}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print("QUERY {}", databusURI.replace("\n", " "))
print(f"QUERY {databusURI.replace(chr(10), ' ')}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/api/download.py` at line 1068, The print statement currently
prints a literal "{}" instead of interpolating databusURI; replace the incorrect
print("QUERY {}", databusURI.replace("\n", " ")) with a proper interpolated
string (e.g., using an f-string or .format) so the cleaned databusURI value is
included in the log; update the print call near the databusURI usage in
databusclient/api/download.py to use f"QUERY {databusURI.replace('\\n', ' ')}"
(or equivalent) so the URI is shown correctly.

Comment on lines +307 to +321
try:
with _open_reader(source_path, source_format) as reader:
with _open_writer(target_path, target_format) as writer:
while True:
chunk = reader.read(FileConverter.CHUNK_SIZE)
if not chunk:
break
writer.write(chunk)

os.remove(source_path)
print(f"Conversion complete: {os.path.basename(target_path)}")
except Exception as e:
if os.path.exists(target_path):
os.remove(target_path)
raise RuntimeError(f"Compression conversion failed: {e}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

convert_file exception handler catches its own os.remove(source_path) failure.

If the I/O conversion succeeds but os.remove(source_path) on Line 316 raises (e.g., permission error), the exception handler on Line 318 will also try to remove target_path — deleting the successfully converted output. os.remove(source_path) should be outside the try block.

🐛 Proposed fix
         try:
             with _open_reader(source_path, source_format) as reader:
                 with _open_writer(target_path, target_format) as writer:
                     while True:
                         chunk = reader.read(FileConverter.CHUNK_SIZE)
                         if not chunk:
                             break
                         writer.write(chunk)
-
-            os.remove(source_path)
-            print(f"Conversion complete: {os.path.basename(target_path)}")
         except Exception as e:
             if os.path.exists(target_path):
                 os.remove(target_path)
             raise RuntimeError(f"Compression conversion failed: {e}")
+
+        os.remove(source_path)
+        print(f"Conversion complete: {os.path.basename(target_path)}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/extensions/file_converter.py` around lines 307 - 321, The try
block in convert_file currently covers both the read/write loop and
os.remove(source_path), so if removing the source fails you end up treating it
as a conversion failure and deleting the target; to fix, limit the try/except to
only the conversion (the with _open_reader/_open_writer and read/write loop) and
move os.remove(source_path) outside that try (or into a separate post-success
block); keep the except behavior that removes target_path if a conversion
exception occurred, but do not remove target_path when source deletion fails —
instead handle source removal failure separately (log or raise a distinct error)
so successful outputs aren't deleted; refer to convert_file, _open_reader,
_open_writer, and FileConverter.CHUNK_SIZE to locate the code to change.

Comment on lines +455 to +462
def _wrap_reader(stream: BinaryIO, fmt: str):
"""Wrap *stream* so that ``read()`` yields decompressed bytes."""
if fmt == "none":
return stream
if fmt == "zstd" and _HAS_ZSTD:
dctx = _zstd.ZstdDecompressor()
return dctx.stream_reader(stream)
return COMPRESSION_MODULES[fmt].open(stream, "rb")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's look at the file structure and the specific functions
cat -n databusclient/extensions/file_converter.py | sed -n '450,480p'

Repository: dbpedia/databus-python-client

Length of output: 992


🏁 Script executed:

#!/bin/bash
# Also check the convert_stream function around line 375
cat -n databusclient/extensions/file_converter.py | sed -n '370,410p'

Repository: dbpedia/databus-python-client

Length of output: 1848


🏁 Script executed:

#!/bin/bash
# Get more context on how reader/writer are used in convert_stream
rg -n 'def convert_stream|reader\.close|writer\.close|_wrap_reader|_wrap_writer' databusclient/extensions/file_converter.py -A 2

Repository: dbpedia/databus-python-client

Length of output: 776


🏁 Script executed:

#!/bin/bash
# Get the full convert_stream function
cat -n databusclient/extensions/file_converter.py | sed -n '328,380p'

Repository: dbpedia/databus-python-client

Length of output: 2498


🏁 Script executed:

#!/bin/bash
# Check what _NullCtx does (used by _wrap_writer for "none" format)
cat -n databusclient/extensions/file_converter.py | sed -n '440,453p'

Repository: dbpedia/databus-python-client

Length of output: 438


🏁 Script executed:

#!/bin/bash
# Check imports to understand zstd usage
head -50 databusclient/extensions/file_converter.py | grep -n 'import\|zstd'

Repository: dbpedia/databus-python-client

Length of output: 499


🏁 Script executed:

#!/bin/bash
# Search for tests related to convert_stream and zstd
find . -name '*test*.py' -type f | head -20

Repository: dbpedia/databus-python-client

Length of output: 240


🏁 Script executed:

#!/bin/bash
# Look for any test files that test convert_stream
fd -e py test | xargs rg -l 'convert_stream|_wrap_reader|_wrap_writer' 2>/dev/null

Repository: dbpedia/databus-python-client

Length of output: 101


🌐 Web query:

zstandard Python library stream_reader close() behavior underlying stream

💡 Result:

In python-zstandard, the object returned by ZstdDecompressor.stream_reader(source, ...) / ZstdCompressor.stream_reader(source, ...) is a io.RawIOBase-style wrapper, and calling close() on it will (by default) also call close() on the underlying source stream.

  • The stream_reader() APIs have a closefd parameter: closefd=True means closing the zstd reader closes the underlying stream; closefd=False prevents that. [1]
  • This default behavior was changed so that stream_reader() now defaults to closing the source stream when the wrapper is closed, and you must pass closefd=False if you don’t want the underlying stream closed. [2]

So, if you need to keep the underlying stream open:

reader = zstandard.ZstdDecompressor().stream_reader(fh, closefd=False)
...
reader.close()   # fh remains open

Sources: [1] [2]


🌐 Web query:

zstandard Python library stream_writer context manager underlying stream not closed

💡 Result:

In python-zstandard, ZstdCompressor.stream_writer() returns a ZstdCompressionWriter that does close the underlying writer only if:

  • the underlying object has a .close() method, and
  • closefd=True (the default).

Using it as a context manager (with cctx.stream_writer(fh) as compressor:) calls close() on exit, and close() will also call writer.close() when closefd=True. If you pass closefd=False, the underlying stream will be left open. [1]


🏁 Script executed:

#!/bin/bash
# Look at the test file for file_converter
cat -n tests/test_file_converter.py | head -100

Repository: dbpedia/databus-python-client

Length of output: 4380


🏁 Script executed:

#!/bin/bash
# Search for any tests that verify stream closing behavior
rg -n 'closefd|close.*stream|input_stream|output_stream' tests/test_file_converter.py

Repository: dbpedia/databus-python-client

Length of output: 55


🏁 Script executed:

#!/bin/bash
# Check if there are any tests that verify the underlying streams aren't closed
rg -A 5 'def test.*convert_stream' tests/test_file_converter.py

Repository: dbpedia/databus-python-client

Length of output: 703


🏁 Script executed:

#!/bin/bash
# Get full test implementations to see what happens after convert_stream
cat -n tests/test_file_converter.py | sed -n '180,250p'

Repository: dbpedia/databus-python-client

Length of output: 2929


🏁 Script executed:

#!/bin/bash
# Also check if there are zstd-specific tests
rg -B 2 -A 15 'test.*zstd|test.*convert_stream' tests/test_file_converter.py | head -80

Repository: dbpedia/databus-python-client

Length of output: 2655


🏁 Script executed:

#!/bin/bash
# Let's verify the standard library behavior for gzip/bz2/lzma with file-like objects
python3 << 'EOF'
import io
import gzip
import bz2

# Test with gzip
print("=== Testing gzip.open() ===")
data = b"test data"
buf = io.BytesIO()
with gzip.open(buf, "wb") as f:
    f.write(data)

buf.seek(0)
print(f"Before reading: buf.closed = {buf.closed}")
reader = gzip.open(buf, "rb")
print(f"After gzip.open(buf, 'rb'): buf.closed = {buf.closed}")
content = reader.read()
print(f"After reader.read(): buf.closed = {buf.closed}")
reader.close()
print(f"After reader.close(): buf.closed = {buf.closed}")

# Try to seek on closed BytesIO
try:
    buf.seek(0)
    print("buf.seek(0) succeeded - buf is still usable")
except ValueError as e:
    print(f"buf.seek(0) failed: {e}")

print("\n=== Testing bz2.open() ===")
buf2 = io.BytesIO()
with bz2.open(buf2, "wb") as f:
    f.write(data)

buf2.seek(0)
reader2 = bz2.open(buf2, "rb")
reader2.close()
print(f"After reader.close(): buf2.closed = {buf2.closed}")

try:
    buf2.seek(0)
    print("buf2.seek(0) succeeded - buf2 is still usable")
except ValueError as e:
    print(f"buf2.seek(0) failed: {e}")
EOF

Repository: dbpedia/databus-python-client

Length of output: 423


🏁 Script executed:

#!/bin/bash
# Final verification: check if there are any other calls to stream_reader/stream_writer in the codebase
rg -n 'stream_reader|stream_writer' databusclient/

Repository: dbpedia/databus-python-client

Length of output: 423


🏁 Script executed:

#!/bin/bash
# Check the functions at lines 427 and 438
cat -n databusclient/extensions/file_converter.py | sed -n '420,440p'

Repository: dbpedia/databus-python-client

Length of output: 944


🏁 Script executed:

#!/bin/bash
# Get more context to understand these functions
cat -n databusclient/extensions/file_converter.py | sed -n '410,445p'

Repository: dbpedia/databus-python-client

Length of output: 1438


🏁 Script executed:

#!/bin/bash
# Check how _open_reader and _open_writer are used
rg -B 3 -A 5 '_open_reader\(|_open_writer\(' databusclient/extensions/file_converter.py

Repository: dbpedia/databus-python-client

Length of output: 998


🏁 Script executed:

#!/bin/bash
# Let me see the full convert_file function to confirm the context
cat -n databusclient/extensions/file_converter.py | sed -n '300,330p'

Repository: dbpedia/databus-python-client

Length of output: 1488


🏁 Script executed:

#!/bin/bash
# Confirm that _open_reader/_open_writer own the file handles they create
# while _wrap_reader/_wrap_writer receive caller-owned streams

echo "=== _open_reader and _open_writer ==="
echo "These functions OPEN file paths themselves, so they OWN the file handles."
echo "They return context managers that are used as: with _open_reader(...) as reader:"
echo "When used with closefd=True (default), closing releases the file they opened. ✓ CORRECT"

echo ""
echo "=== _wrap_reader and _wrap_writer ==="
echo "These functions receive already-opened BinaryIO objects from the caller."
echo "The caller OWNS these streams and expects them to remain open."
echo "When used in convert_stream, with closefd=True (default), closing also closes caller-owned streams. ✗ WRONG"

Repository: dbpedia/databus-python-client

Length of output: 636


Pass closefd=False to zstd stream wrappers in _wrap_reader and _wrap_writer to prevent closing caller-owned streams.

The stream_reader() call at line 461 and stream_writer() call at line 471 default to closefd=True, causing the caller-owned input_stream and output_stream to be closed when the wrappers are closed (line 375 and line 364's with block exit). Since the caller owns these streams, add closefd=False to both:

Required changes
# Line 461 (in _wrap_reader)
return dctx.stream_reader(stream, closefd=False)

# Line 471 (in _wrap_writer)
return cctx.stream_writer(stream, closefd=False)

Note: The similar calls in _open_reader (line 427) and _open_writer (line 438) are correct as-is—those functions own the file handles they create, so closefd=True is appropriate.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@databusclient/extensions/file_converter.py` around lines 455 - 462, The zstd
wrappers in _wrap_reader and _wrap_writer currently call
_zstd.ZstdDecompressor().stream_reader(...) and
_zstd.ZstdCompressor().stream_writer(...) with the default closefd=True which
closes caller-owned streams: update the calls in _wrap_reader (stream_reader)
and _wrap_writer (stream_writer) to pass closefd=False so the wrapper won't
close the underlying BinaryIO owned by the caller; keep existing behavior
elsewhere unchanged.

Comment on lines +24 to +34
from databusclient.extensions.file_converter import (
FileConverter,
COMPRESSION_EXTENSIONS,
COMPRESSION_MODULES,
)
from databusclient.api.download import (
_detect_compression_format,
_should_convert_file,
_get_converted_filename,
_convert_compression_format,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Unused imports cause pipeline failure (Ruff F401).

COMPRESSION_EXTENSIONS, COMPRESSION_MODULES, _detect_compression_format, _get_converted_filename, and _convert_compression_format are imported but never referenced in the test file. The CI build is failing because of these.

🐛 Proposed fix
 from databusclient.extensions.file_converter import (
     FileConverter,
-    COMPRESSION_EXTENSIONS,
-    COMPRESSION_MODULES,
-)
-from databusclient.api.download import (
-    _detect_compression_format,
-    _should_convert_file,
-    _get_converted_filename,
-    _convert_compression_format,
 )
+from databusclient.api.download import _should_convert_file
 from databusclient.cli import download
🧰 Tools
🪛 GitHub Check: build

[failure] 33-33: Ruff (F401)
tests/test_file_converter.py:33:5: F401 databusclient.api.download._convert_compression_format imported but unused


[failure] 32-32: Ruff (F401)
tests/test_file_converter.py:32:5: F401 databusclient.api.download._get_converted_filename imported but unused


[failure] 30-30: Ruff (F401)
tests/test_file_converter.py:30:5: F401 databusclient.api.download._detect_compression_format imported but unused


[failure] 27-27: Ruff (F401)
tests/test_file_converter.py:27:5: F401 databusclient.extensions.file_converter.COMPRESSION_MODULES imported but unused


[failure] 26-26: Ruff (F401)
tests/test_file_converter.py:26:5: F401 databusclient.extensions.file_converter.COMPRESSION_EXTENSIONS imported but unused

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_file_converter.py` around lines 24 - 34, The test file imports
several names that are never used causing Ruff F401; remove the unused imports
COMPRESSION_EXTENSIONS and COMPRESSION_MODULES from
databusclient.extensions.file_converter and remove _detect_compression_format,
_get_converted_filename, and _convert_compression_format from
databusclient.api.download so only FileConverter and _should_convert_file (or
whatever names the tests actually use) remain imported, or alternatively
reference those imported symbols in the tests if they are intended to be tested;
update the import block in tests/test_file_converter.py to import only the
symbols actually used (e.g., FileConverter and _should_convert_file) to resolve
the linter error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feature: advanced file conversion

2 participants

Comments