feat: Add file_converter extension module (Issue #54)#56
feat: Add file_converter extension module (Issue #54)#56vaibhav45sktech wants to merge 6 commits intodbpedia:mainfrom
Conversation
- Create new file_converter.py extension module in databusclient/extensions/ - Implements FileConverter class with streaming pipeline support - Supports gzip decompression with optional checksum validation - Provides methods for compress_gzip_stream and decompress_gzip_stream - Minimal version as suggested in issue to start with gzip + checksum - Can be extended later to support more compression formats
📝 WalkthroughWalkthroughAdds a new streaming FileConverter extension (multi-format compress/decompress, detect, checksum, convert utilities) and integrates it into the download CLI and API flow; adds tests for conversion and updates package exports to expose FileConverter. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI
participant Download as Download API
participant Vault as Vault/Auth
participant Remote as Remote Server (artifact)
participant Converter as FileConverter
participant FS as Local filesystem
CLI->>Download: request download (convert_to / decompress)
Download->>Vault: request token exchange (if required)
Vault-->>Download: bearer token
Download->>Remote: GET artifact (with token if any) [stream]
Remote-->>Download: streamed bytes
Download->>Converter: stream bytes + source_format,target_format,validate_checksum
Converter-->>Download: converted stream (on-the-fly) / checksum result
Download->>FS: write final file (or temp + cleanup)
FS-->>CLI: success / error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 4❌ Failed checks (4 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@databusclient/extensions/file_converter.py`:
- Around line 38-50: The code creates source_hasher from expected_checksum but
never uses it, so remove the unused expected_checksum parameter and the
source_hasher/hash-from-compressed-stream logic from the FileConverter method
that reads gzip (the block using gzip.open, FileConverter.CHUNK_SIZE, hasher,
and output_stream), keep only validate_checksum-driven hasher for decompressed
chunks, and update the method signature and return value accordingly; then
update callers to perform checksum validation on the compressed input stream
(e.g., via validate_checksum_stream) before calling this decompression routine.
🧹 Nitpick comments (1)
databusclient/extensions/file_converter.py (1)
70-103:seek(0)assumes a seekable stream; return type is misleading.Two concerns:
Line 88:
input_stream.seek(0)will raise on non-seekable streams (e.g., network response bodies, pipes). Since the PR objective targets integration with the download pipeline, callers will need to be aware of this constraint. Consider either documenting the seekable requirement, removing theseek(0)call (let the caller manage stream position), or accepting a non-seekable stream and removing the seek.Return type: The method signature says
-> boolbut it never returnsFalse— it either returnsTrueor raisesIOError. Consider returningboolwithout raising (let the caller decide), or changing the return type toNoneand only raising on failure. Pick one contract and be consistent.Suggested approach: remove seek, return bool without raising
`@staticmethod` def validate_checksum_stream( input_stream: BinaryIO, expected_checksum: str ) -> bool: """Validate SHA256 checksum of a stream. Args: - input_stream: Input stream to validate + input_stream: Input stream to validate (must be positioned at start) expected_checksum: Expected SHA256 checksum Returns: - True if checksum matches + True if checksum matches, False otherwise - - Raises: - IOError: If checksum validation fails """ hasher = hashlib.sha256() - input_stream.seek(0) - + while True: chunk = input_stream.read(FileConverter.CHUNK_SIZE) if not chunk: break hasher.update(chunk) - + computed = hasher.hexdigest() - if computed.lower() != expected_checksum.lower(): - raise IOError( - f"Checksum mismatch: expected {expected_checksum}, " - f"got {computed}" - ) - - return True + return computed.lower() == expected_checksum.lower()
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
databusclient/extensions/file_converter.py (3)
102-105:IOErroris semantically incorrect for a checksum mismatch; considerValueError.
IOError(aliased toOSErrorin Python 3) conventionally signals operating-system-level I/O failures (file not found, disk full, permission denied). A checksum mismatch is a data-integrity error —ValueErroror a customChecksumMismatchErrorwould let callers distinguish between a genuine I/O failure and bad data without catching allOSErrors.♻️ Proposed change
- raise IOError( + raise ValueError( f"Checksum mismatch: expected {expected_checksum}, " f"got {computed}" )Also update the docstring:
- Raises: - IOError: If checksum validation fails + Raises: + ValueError: If checksum validation fails🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@databusclient/extensions/file_converter.py` around lines 102 - 105, Replace the semantically incorrect IOError raised for checksum mismatches with a more appropriate exception: change the raise in file_converter.py (the checksum-checking block that currently raises IOError("Checksum mismatch: expected..., got ...")) to raise ValueError with the same message, or define and raise a custom ChecksumMismatchError class and use that instead; also update the surrounding function/class docstring (the docstring for the checksum verification routine in file_converter.py) to document the new exception type so callers know to catch ValueError or ChecksumMismatchError.
21-21:validate_checksumparameter name should becompute_checksum.The parameter only computes and returns the digest — it performs no comparison. The docstring correctly describes the behavior as "compute", but the parameter name implies validation. The past review had proposed this rename; it wasn't carried through.
♻️ Proposed rename
def decompress_gzip_stream( input_stream: BinaryIO, output_stream: BinaryIO, - validate_checksum: bool = False, + compute_checksum: bool = False, ) -> Optional[str]: """Decompress gzip stream with optional checksum computation. ... validate_checksum: Whether to compute a SHA-256 checksum of + compute_checksum: Whether to compute a SHA-256 checksum of the decompressed output. ... """ - hasher = hashlib.sha256() if validate_checksum else None + hasher = hashlib.sha256() if compute_checksum else None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@databusclient/extensions/file_converter.py` at line 21, Rename the parameter validate_checksum to compute_checksum throughout the file to match behaviour described in the docstring: update the function/method signature(s) that currently declare validate_checksum (and any default value False) to compute_checksum: bool = False, update all internal variable references and any return/tuple keys or comments that use validate_checksum to compute_checksum, and update any call sites inside databusclient/extensions/file_converter.py (and its unit tests if present) so callers use the new name; ensure type hints, docstring example/parameter list, and any logging/messages reflect the new name.
12-107: No tests provided for the new module.The PR adds a non-trivial streaming pipeline but no unit tests. At minimum, these cases should be covered:
- Round-trip: compress → decompress restores original bytes.
validate_checksum_streampasses on a correct hash and raises on a bad hash.decompress_gzip_streamwithvalidate_checksum=Truereturns the correct hex digest.- Non-seekable stream handling for
validate_checksum_stream.Would you like me to generate a
tests/test_file_converter.pyskeleton covering the above cases, or open a follow-up issue to track this?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@databusclient/extensions/file_converter.py` around lines 12 - 107, Add unit tests for the FileConverter class to cover the streaming pipeline: create tests/test_file_converter.py and include (1) a round-trip test that writes random bytes, uses FileConverter.compress_gzip_stream to compress into a buffer and then FileConverter.decompress_gzip_stream to decompress and assert original bytes are restored; (2) tests for validate_checksum_stream that assert True on a correct SHA-256 and that an IOError is raised on a bad hash; (3) a test that calls FileConverter.decompress_gzip_stream with validate_checksum=True and asserts the returned hex digest equals the SHA-256 of the decompressed bytes; and (4) a test for non-seekable input to validate_checksum_stream using a custom non-seekable wrapper (or io.BufferedReader over a pipe-like object) to ensure validation still works without calling seek; use BinaryIO-compatible buffers (io.BytesIO) and reference FileConverter.compress_gzip_stream, FileConverter.decompress_gzip_stream, and FileConverter.validate_checksum_stream in assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@databusclient/extensions/file_converter.py`:
- Around line 91-107: validate_checksum_stream currently reads to EOF and
returns without resetting the stream, which breaks the validate-then-decompress
pattern used by decompress_gzip_stream; change validate_checksum_stream to seek
input_stream back to 0 before returning and update its docstring to state that
input_stream must be seekable (and that the function will reset the stream
position), so callers like FileConverter.decompress_gzip_stream can safely read
from the start after validation.
- Around line 12-13: The package API currently doesn't export FileConverter from
the extensions package, forcing consumers to import
databusclient.extensions.file_converter.FileConverter directly; update
extensions/__init__.py to import and export FileConverter so it can be accessed
as databusclient.extensions.FileConverter (e.g., add "from .file_converter
import FileConverter" and include it in __all__), or document that only direct
module imports are supported—modify the __init__ export to reference the
FileConverter class name to restore the expected package-level import.
---
Nitpick comments:
In `@databusclient/extensions/file_converter.py`:
- Around line 102-105: Replace the semantically incorrect IOError raised for
checksum mismatches with a more appropriate exception: change the raise in
file_converter.py (the checksum-checking block that currently raises
IOError("Checksum mismatch: expected..., got ...")) to raise ValueError with the
same message, or define and raise a custom ChecksumMismatchError class and use
that instead; also update the surrounding function/class docstring (the
docstring for the checksum verification routine in file_converter.py) to
document the new exception type so callers know to catch ValueError or
ChecksumMismatchError.
- Line 21: Rename the parameter validate_checksum to compute_checksum throughout
the file to match behaviour described in the docstring: update the
function/method signature(s) that currently declare validate_checksum (and any
default value False) to compute_checksum: bool = False, update all internal
variable references and any return/tuple keys or comments that use
validate_checksum to compute_checksum, and update any call sites inside
databusclient/extensions/file_converter.py (and its unit tests if present) so
callers use the new name; ensure type hints, docstring example/parameter list,
and any logging/messages reflect the new name.
- Around line 12-107: Add unit tests for the FileConverter class to cover the
streaming pipeline: create tests/test_file_converter.py and include (1) a
round-trip test that writes random bytes, uses
FileConverter.compress_gzip_stream to compress into a buffer and then
FileConverter.decompress_gzip_stream to decompress and assert original bytes are
restored; (2) tests for validate_checksum_stream that assert True on a correct
SHA-256 and that an IOError is raised on a bad hash; (3) a test that calls
FileConverter.decompress_gzip_stream with validate_checksum=True and asserts the
returned hex digest equals the SHA-256 of the decompressed bytes; and (4) a test
for non-seekable input to validate_checksum_stream using a custom non-seekable
wrapper (or io.BufferedReader over a pipe-like object) to ensure validation
still works without calling seek; use BinaryIO-compatible buffers (io.BytesIO)
and reference FileConverter.compress_gzip_stream,
FileConverter.decompress_gzip_stream, and FileConverter.validate_checksum_stream
in assertions.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@databusclient/extensions/file_converter.py`:
- Around line 102-109: The input_stream is only seeked back on success so when
the checksum check raises an IOError the stream stays at EOF; move the
input_stream.seek(0) to before the comparison of computed vs expected_checksum
(i.e., seek the input_stream right after computing computed = hasher.hexdigest()
and before comparing computed.lower() != expected_checksum.lower()) so the
stream is rewound regardless of an exception, and update the function docstring
to drop the word "successful" (so it documents that the stream will be rewound
even when a checksum mismatch occurs).
---
Duplicate comments:
In `@databusclient/extensions/file_converter.py`:
- Line 21: The parameter name validate_checksum is misleading because the
function only computes and returns a digest; rename the parameter to
compute_checksum across the FileConverter API in
databusclient/extensions/file_converter.py (update the function signature where
validate_checksum appears, any internal uses/branches that check it, and all
docstrings that reference "validate_checksum") and update all callers/tests to
use compute_checksum so the name matches behavior and the docstrings that
already say "compute" remain consistent.
|
Greetings @Integer-Ctrl,Could you please review this pr whenever available . I might be mistaken in some of the changes, so your guidance and feedback would be very helpful. Thank you! |
|
Hi @vaibhav45sktech, This PR misses the actual task. You have introduced a class that can be used to convert compressions. But this logic is not built into the download pipeline anywhere, or have I overlooked it? In that case, it is code that would have no use. However, the base conversion and checksum validation are already implemented, see databus-python-client/databusclient/api/download.py Lines 439 to 467 in 704136a The ticket #54 only contains ideas for improving the file conversion. |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (3)
databusclient/api/download.py (2)
164-166: Missing space inreturn{}(Line 166).Minor style nit, but consistent with PEP 8.
Proposed fix
- return{} + return {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@databusclient/api/download.py` around lines 164 - 166, Fix the PEP8 spacing issue in databusclient/api/download.py by replacing the tight brace in the empty dict return with a space: change the `return{}` statement (in the block where `graph = jd` is set) to `return {}` so it conforms to the spacing convention.
366-419: "Streaming" path downloads the entire file to disk first, then converts.The comment says "Streaming download + conversion in a single pass" (Line 380), but the implementation writes the full compressed file to disk (Line 389-394), validates the checksum, then calls
_convert_compression_formatwhich reads and re-writes. This is download-then-convert, not single-pass streaming. The approach is reasonable for checksum-on-compressed-bytes, but the comment is misleading.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@databusclient/api/download.py` around lines 366 - 419, The comment claiming "Streaming download + conversion in a single pass" is incorrect because the code (using response.iter_content to write to filename, computing checksum_hasher, then calling _convert_compression_format) downloads the full compressed file to disk and only then converts it; update the comment to accurately state that the implementation downloads the compressed object to a temp file, validates checksum (when validate_checksum and checksum_hasher are used), and then performs conversion via _convert_compression_format (or alternatively implement true streaming conversion that decompresses on the fly while preserving checksum of compressed bytes if you prefer that behavior); reference symbols: response.iter_content, checksum_hasher, validate_checksum, filename, _convert_compression_format, _get_converted_filename, and _should_convert_file.databusclient/extensions/file_converter.py (1)
72-100: Docstring says magic "takes precedence if the extension is ambiguous" but extension always wins when matched.Extension is checked first and returned immediately (Line 92). Magic bytes are only consulted when no extension matches. The docstring implies magic could override an extension match. Minor wording nit — consider: "Falls back to magic-number detection when no known extension is found."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@databusclient/extensions/file_converter.py` around lines 72 - 100, The docstring for FileConverter.detect_format inaccurately states magic-number detection "takes precedence if the extension is ambiguous" while the implementation always returns on an extension match; update the docstring to reflect the actual behavior (extension-based detection first, magic-number detection used as a fallback when no known extension is found) or alternatively change the implementation to consult FileConverter.detect_format_by_magic before honoring an extension; reference the detect_format, detect_format_by_magic methods and COMPRESSION_EXTENSIONS to locate the logic to update.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@databusclient/api/download.py`:
- Around line 17-21: The import list in databusclient/api/download.py includes
unused symbols COMPRESSION_EXTENSIONS and COMPRESSION_MODULES which cause a Ruff
F401 failure; update the import from databusclient.extensions.file_converter to
only import FileConverter (remove COMPRESSION_EXTENSIONS and
COMPRESSION_MODULES) or, if those constants are needed later, reference them
where used or mark as used, ensuring the FileConverter import remains intact
(look for the import statement that currently lists FileConverter,
COMPRESSION_EXTENSIONS, COMPRESSION_MODULES).
- Line 1068: The print statement currently prints a literal "{}" instead of
interpolating databusURI; replace the incorrect print("QUERY {}",
databusURI.replace("\n", " ")) with a proper interpolated string (e.g., using an
f-string or .format) so the cleaned databusURI value is included in the log;
update the print call near the databusURI usage in databusclient/api/download.py
to use f"QUERY {databusURI.replace('\\n', ' ')}" (or equivalent) so the URI is
shown correctly.
- Line 95: The file mixes PEP 604 union syntax (str | None) and typing.Optional;
standardize to typing.Optional[str] for consistency by replacing all occurrences
of "str | None" with "Optional[str]" (including the function
_extract_checksum_from_node and the other annotated functions noted), and ensure
"from typing import Optional" is present in the top-level imports; update any
type hints like "int | None" similarly if present to keep a uniform style.
In `@databusclient/extensions/file_converter.py`:
- Around line 307-321: The try block in convert_file currently covers both the
read/write loop and os.remove(source_path), so if removing the source fails you
end up treating it as a conversion failure and deleting the target; to fix,
limit the try/except to only the conversion (the with _open_reader/_open_writer
and read/write loop) and move os.remove(source_path) outside that try (or into a
separate post-success block); keep the except behavior that removes target_path
if a conversion exception occurred, but do not remove target_path when source
deletion fails — instead handle source removal failure separately (log or raise
a distinct error) so successful outputs aren't deleted; refer to convert_file,
_open_reader, _open_writer, and FileConverter.CHUNK_SIZE to locate the code to
change.
- Around line 455-462: The zstd wrappers in _wrap_reader and _wrap_writer
currently call _zstd.ZstdDecompressor().stream_reader(...) and
_zstd.ZstdCompressor().stream_writer(...) with the default closefd=True which
closes caller-owned streams: update the calls in _wrap_reader (stream_reader)
and _wrap_writer (stream_writer) to pass closefd=False so the wrapper won't
close the underlying BinaryIO owned by the caller; keep existing behavior
elsewhere unchanged.
In `@tests/test_file_converter.py`:
- Around line 24-34: The test file imports several names that are never used
causing Ruff F401; remove the unused imports COMPRESSION_EXTENSIONS and
COMPRESSION_MODULES from databusclient.extensions.file_converter and remove
_detect_compression_format, _get_converted_filename, and
_convert_compression_format from databusclient.api.download so only
FileConverter and _should_convert_file (or whatever names the tests actually
use) remain imported, or alternatively reference those imported symbols in the
tests if they are intended to be tested; update the import block in
tests/test_file_converter.py to import only the symbols actually used (e.g.,
FileConverter and _should_convert_file) to resolve the linter error.
---
Nitpick comments:
In `@databusclient/api/download.py`:
- Around line 164-166: Fix the PEP8 spacing issue in
databusclient/api/download.py by replacing the tight brace in the empty dict
return with a space: change the `return{}` statement (in the block where `graph
= jd` is set) to `return {}` so it conforms to the spacing convention.
- Around line 366-419: The comment claiming "Streaming download + conversion in
a single pass" is incorrect because the code (using response.iter_content to
write to filename, computing checksum_hasher, then calling
_convert_compression_format) downloads the full compressed file to disk and only
then converts it; update the comment to accurately state that the implementation
downloads the compressed object to a temp file, validates checksum (when
validate_checksum and checksum_hasher are used), and then performs conversion
via _convert_compression_format (or alternatively implement true streaming
conversion that decompresses on the fly while preserving checksum of compressed
bytes if you prefer that behavior); reference symbols: response.iter_content,
checksum_hasher, validate_checksum, filename, _convert_compression_format,
_get_converted_filename, and _should_convert_file.
In `@databusclient/extensions/file_converter.py`:
- Around line 72-100: The docstring for FileConverter.detect_format inaccurately
states magic-number detection "takes precedence if the extension is ambiguous"
while the implementation always returns on an extension match; update the
docstring to reflect the actual behavior (extension-based detection first,
magic-number detection used as a fallback when no known extension is found) or
alternatively change the implementation to consult
FileConverter.detect_format_by_magic before honoring an extension; reference the
detect_format, detect_format_by_magic methods and COMPRESSION_EXTENSIONS to
locate the logic to update.
| from databusclient.extensions.file_converter import ( | ||
| FileConverter, | ||
| COMPRESSION_EXTENSIONS, | ||
| COMPRESSION_MODULES, | ||
| ) |
There was a problem hiding this comment.
Unused imports COMPRESSION_EXTENSIONS and COMPRESSION_MODULES cause pipeline failure.
Ruff F401 confirms these are imported but never used in this file. The pipeline is currently failing because of this.
🐛 Proposed fix
from databusclient.extensions.file_converter import (
FileConverter,
- COMPRESSION_EXTENSIONS,
- COMPRESSION_MODULES,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from databusclient.extensions.file_converter import ( | |
| FileConverter, | |
| COMPRESSION_EXTENSIONS, | |
| COMPRESSION_MODULES, | |
| ) | |
| from databusclient.extensions.file_converter import ( | |
| FileConverter, | |
| ) |
🧰 Tools
🪛 GitHub Actions: Python CI (Lint & pytest)
[error] 19-19: F401 'databusclient.extensions.file_converter.COMPRESSION_EXTENSIONS' imported but unused
🪛 GitHub Check: build
[failure] 20-20: Ruff (F401)
databusclient/api/download.py:20:5: F401 databusclient.extensions.file_converter.COMPRESSION_MODULES imported but unused
[failure] 19-19: Ruff (F401)
databusclient/api/download.py:19:5: F401 databusclient.extensions.file_converter.COMPRESSION_EXTENSIONS imported but unused
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@databusclient/api/download.py` around lines 17 - 21, The import list in
databusclient/api/download.py includes unused symbols COMPRESSION_EXTENSIONS and
COMPRESSION_MODULES which cause a Ruff F401 failure; update the import from
databusclient.extensions.file_converter to only import FileConverter (remove
COMPRESSION_EXTENSIONS and COMPRESSION_MODULES) or, if those constants are
needed later, reference them where used or mark as used, ensuring the
FileConverter import remains intact (look for the import statement that
currently lists FileConverter, COMPRESSION_EXTENSIONS, COMPRESSION_MODULES).
| # compiled regex for SHA-256 hex strings | ||
| _SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$") | ||
|
|
||
| def _extract_checksum_from_node(node) -> str | None: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check the project's minimum Python version
echo "=== setup.py / setup.cfg / pyproject.toml ==="
for f in setup.py setup.cfg pyproject.toml; do
if [ -f "$f" ]; then
echo "--- $f ---"
cat "$f"
fi
done
echo "=== python_requires ==="
rg -n 'python_requires|python_version|target-version' setup.py setup.cfg pyproject.toml 2>/dev/null || echo "Not found"Repository: dbpedia/databus-python-client
Length of output: 919
🏁 Script executed:
# Examine the download.py file to verify union syntax usage
echo "=== Check lines 95, 183, 467, 788 in databusclient/api/download.py ==="
if [ -f "databusclient/api/download.py" ]; then
wc -l databusclient/api/download.py
echo "--- Line 95 ---"
sed -n '93,97p' databusclient/api/download.py
echo "--- Line 183 ---"
sed -n '181,185p' databusclient/api/download.py
echo "--- Line 467 ---"
sed -n '465,469p' databusclient/api/download.py
echo "--- Line 788 ---"
sed -n '786,790p' databusclient/api/download.py
else
echo "File not found"
fiRepository: dbpedia/databus-python-client
Length of output: 955
🏁 Script executed:
# Check for __future__ imports and overall union type usage
echo "=== Checking __future__ imports and union syntax in download.py ==="
if [ -f "databusclient/api/download.py" ]; then
echo "--- __future__ imports ---"
head -20 databusclient/api/download.py | rg -n "from __future__"
echo "--- Count of X | Y syntax (union types) ---"
rg -n '\s\|\s' databusclient/api/download.py | head -20
echo "--- Count of Optional usage ---"
rg -n 'Optional\[' databusclient/api/download.py | head -20
fiRepository: dbpedia/databus-python-client
Length of output: 780
Standardize union type syntax for consistency.
The code mixes str | None syntax (lines 95, 183, 226, 467, 501, 788) with Optional[str] syntax (lines 24, 38-39), which is inconsistent. While both are valid for Python 3.11+ (the project's minimum version per pyproject.toml), use one style uniformly throughout the file for readability.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@databusclient/api/download.py` at line 95, The file mixes PEP 604 union
syntax (str | None) and typing.Optional; standardize to typing.Optional[str] for
consistency by replacing all occurrences of "str | None" with "Optional[str]"
(including the function _extract_checksum_from_node and the other annotated
functions noted), and ensure "from typing import Optional" is present in the
top-level imports; update any type hints like "int | None" similarly if present
to keep a uniform style.
| print("query in file not supported yet") | ||
| # query as argument | ||
| else: | ||
| print("QUERY {}", databusURI.replace("\n", " ")) |
There was a problem hiding this comment.
Literal {} printed in log output — likely a .format() remnant.
print("QUERY {}", databusURI.replace("\n", " ")) prints the literal string {} because print treats its arguments as separate positional values (separated by space). This should be an f-string.
🐛 Proposed fix
- print("QUERY {}", databusURI.replace("\n", " "))
+ print(f"QUERY {databusURI.replace(chr(10), ' ')}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print("QUERY {}", databusURI.replace("\n", " ")) | |
| print(f"QUERY {databusURI.replace(chr(10), ' ')}") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@databusclient/api/download.py` at line 1068, The print statement currently
prints a literal "{}" instead of interpolating databusURI; replace the incorrect
print("QUERY {}", databusURI.replace("\n", " ")) with a proper interpolated
string (e.g., using an f-string or .format) so the cleaned databusURI value is
included in the log; update the print call near the databusURI usage in
databusclient/api/download.py to use f"QUERY {databusURI.replace('\\n', ' ')}"
(or equivalent) so the URI is shown correctly.
| try: | ||
| with _open_reader(source_path, source_format) as reader: | ||
| with _open_writer(target_path, target_format) as writer: | ||
| while True: | ||
| chunk = reader.read(FileConverter.CHUNK_SIZE) | ||
| if not chunk: | ||
| break | ||
| writer.write(chunk) | ||
|
|
||
| os.remove(source_path) | ||
| print(f"Conversion complete: {os.path.basename(target_path)}") | ||
| except Exception as e: | ||
| if os.path.exists(target_path): | ||
| os.remove(target_path) | ||
| raise RuntimeError(f"Compression conversion failed: {e}") |
There was a problem hiding this comment.
convert_file exception handler catches its own os.remove(source_path) failure.
If the I/O conversion succeeds but os.remove(source_path) on Line 316 raises (e.g., permission error), the exception handler on Line 318 will also try to remove target_path — deleting the successfully converted output. os.remove(source_path) should be outside the try block.
🐛 Proposed fix
try:
with _open_reader(source_path, source_format) as reader:
with _open_writer(target_path, target_format) as writer:
while True:
chunk = reader.read(FileConverter.CHUNK_SIZE)
if not chunk:
break
writer.write(chunk)
-
- os.remove(source_path)
- print(f"Conversion complete: {os.path.basename(target_path)}")
except Exception as e:
if os.path.exists(target_path):
os.remove(target_path)
raise RuntimeError(f"Compression conversion failed: {e}")
+
+ os.remove(source_path)
+ print(f"Conversion complete: {os.path.basename(target_path)}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@databusclient/extensions/file_converter.py` around lines 307 - 321, The try
block in convert_file currently covers both the read/write loop and
os.remove(source_path), so if removing the source fails you end up treating it
as a conversion failure and deleting the target; to fix, limit the try/except to
only the conversion (the with _open_reader/_open_writer and read/write loop) and
move os.remove(source_path) outside that try (or into a separate post-success
block); keep the except behavior that removes target_path if a conversion
exception occurred, but do not remove target_path when source deletion fails —
instead handle source removal failure separately (log or raise a distinct error)
so successful outputs aren't deleted; refer to convert_file, _open_reader,
_open_writer, and FileConverter.CHUNK_SIZE to locate the code to change.
| def _wrap_reader(stream: BinaryIO, fmt: str): | ||
| """Wrap *stream* so that ``read()`` yields decompressed bytes.""" | ||
| if fmt == "none": | ||
| return stream | ||
| if fmt == "zstd" and _HAS_ZSTD: | ||
| dctx = _zstd.ZstdDecompressor() | ||
| return dctx.stream_reader(stream) | ||
| return COMPRESSION_MODULES[fmt].open(stream, "rb") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's look at the file structure and the specific functions
cat -n databusclient/extensions/file_converter.py | sed -n '450,480p'Repository: dbpedia/databus-python-client
Length of output: 992
🏁 Script executed:
#!/bin/bash
# Also check the convert_stream function around line 375
cat -n databusclient/extensions/file_converter.py | sed -n '370,410p'Repository: dbpedia/databus-python-client
Length of output: 1848
🏁 Script executed:
#!/bin/bash
# Get more context on how reader/writer are used in convert_stream
rg -n 'def convert_stream|reader\.close|writer\.close|_wrap_reader|_wrap_writer' databusclient/extensions/file_converter.py -A 2Repository: dbpedia/databus-python-client
Length of output: 776
🏁 Script executed:
#!/bin/bash
# Get the full convert_stream function
cat -n databusclient/extensions/file_converter.py | sed -n '328,380p'Repository: dbpedia/databus-python-client
Length of output: 2498
🏁 Script executed:
#!/bin/bash
# Check what _NullCtx does (used by _wrap_writer for "none" format)
cat -n databusclient/extensions/file_converter.py | sed -n '440,453p'Repository: dbpedia/databus-python-client
Length of output: 438
🏁 Script executed:
#!/bin/bash
# Check imports to understand zstd usage
head -50 databusclient/extensions/file_converter.py | grep -n 'import\|zstd'Repository: dbpedia/databus-python-client
Length of output: 499
🏁 Script executed:
#!/bin/bash
# Search for tests related to convert_stream and zstd
find . -name '*test*.py' -type f | head -20Repository: dbpedia/databus-python-client
Length of output: 240
🏁 Script executed:
#!/bin/bash
# Look for any test files that test convert_stream
fd -e py test | xargs rg -l 'convert_stream|_wrap_reader|_wrap_writer' 2>/dev/nullRepository: dbpedia/databus-python-client
Length of output: 101
🌐 Web query:
zstandard Python library stream_reader close() behavior underlying stream
💡 Result:
In python-zstandard, the object returned by ZstdDecompressor.stream_reader(source, ...) / ZstdCompressor.stream_reader(source, ...) is a io.RawIOBase-style wrapper, and calling close() on it will (by default) also call close() on the underlying source stream.
- The
stream_reader()APIs have aclosefdparameter:closefd=Truemeans closing the zstd reader closes the underlying stream;closefd=Falseprevents that. [1] - This default behavior was changed so that
stream_reader()now defaults to closing the source stream when the wrapper is closed, and you must passclosefd=Falseif you don’t want the underlying stream closed. [2]
So, if you need to keep the underlying stream open:
reader = zstandard.ZstdDecompressor().stream_reader(fh, closefd=False)
...
reader.close() # fh remains openSources: [1] [2]
🌐 Web query:
zstandard Python library stream_writer context manager underlying stream not closed
💡 Result:
In python-zstandard, ZstdCompressor.stream_writer() returns a ZstdCompressionWriter that does close the underlying writer only if:
- the underlying object has a
.close()method, and closefd=True(the default).
Using it as a context manager (with cctx.stream_writer(fh) as compressor:) calls close() on exit, and close() will also call writer.close() when closefd=True. If you pass closefd=False, the underlying stream will be left open. [1]
🏁 Script executed:
#!/bin/bash
# Look at the test file for file_converter
cat -n tests/test_file_converter.py | head -100Repository: dbpedia/databus-python-client
Length of output: 4380
🏁 Script executed:
#!/bin/bash
# Search for any tests that verify stream closing behavior
rg -n 'closefd|close.*stream|input_stream|output_stream' tests/test_file_converter.pyRepository: dbpedia/databus-python-client
Length of output: 55
🏁 Script executed:
#!/bin/bash
# Check if there are any tests that verify the underlying streams aren't closed
rg -A 5 'def test.*convert_stream' tests/test_file_converter.pyRepository: dbpedia/databus-python-client
Length of output: 703
🏁 Script executed:
#!/bin/bash
# Get full test implementations to see what happens after convert_stream
cat -n tests/test_file_converter.py | sed -n '180,250p'Repository: dbpedia/databus-python-client
Length of output: 2929
🏁 Script executed:
#!/bin/bash
# Also check if there are zstd-specific tests
rg -B 2 -A 15 'test.*zstd|test.*convert_stream' tests/test_file_converter.py | head -80Repository: dbpedia/databus-python-client
Length of output: 2655
🏁 Script executed:
#!/bin/bash
# Let's verify the standard library behavior for gzip/bz2/lzma with file-like objects
python3 << 'EOF'
import io
import gzip
import bz2
# Test with gzip
print("=== Testing gzip.open() ===")
data = b"test data"
buf = io.BytesIO()
with gzip.open(buf, "wb") as f:
f.write(data)
buf.seek(0)
print(f"Before reading: buf.closed = {buf.closed}")
reader = gzip.open(buf, "rb")
print(f"After gzip.open(buf, 'rb'): buf.closed = {buf.closed}")
content = reader.read()
print(f"After reader.read(): buf.closed = {buf.closed}")
reader.close()
print(f"After reader.close(): buf.closed = {buf.closed}")
# Try to seek on closed BytesIO
try:
buf.seek(0)
print("buf.seek(0) succeeded - buf is still usable")
except ValueError as e:
print(f"buf.seek(0) failed: {e}")
print("\n=== Testing bz2.open() ===")
buf2 = io.BytesIO()
with bz2.open(buf2, "wb") as f:
f.write(data)
buf2.seek(0)
reader2 = bz2.open(buf2, "rb")
reader2.close()
print(f"After reader.close(): buf2.closed = {buf2.closed}")
try:
buf2.seek(0)
print("buf2.seek(0) succeeded - buf2 is still usable")
except ValueError as e:
print(f"buf2.seek(0) failed: {e}")
EOFRepository: dbpedia/databus-python-client
Length of output: 423
🏁 Script executed:
#!/bin/bash
# Final verification: check if there are any other calls to stream_reader/stream_writer in the codebase
rg -n 'stream_reader|stream_writer' databusclient/Repository: dbpedia/databus-python-client
Length of output: 423
🏁 Script executed:
#!/bin/bash
# Check the functions at lines 427 and 438
cat -n databusclient/extensions/file_converter.py | sed -n '420,440p'Repository: dbpedia/databus-python-client
Length of output: 944
🏁 Script executed:
#!/bin/bash
# Get more context to understand these functions
cat -n databusclient/extensions/file_converter.py | sed -n '410,445p'Repository: dbpedia/databus-python-client
Length of output: 1438
🏁 Script executed:
#!/bin/bash
# Check how _open_reader and _open_writer are used
rg -B 3 -A 5 '_open_reader\(|_open_writer\(' databusclient/extensions/file_converter.pyRepository: dbpedia/databus-python-client
Length of output: 998
🏁 Script executed:
#!/bin/bash
# Let me see the full convert_file function to confirm the context
cat -n databusclient/extensions/file_converter.py | sed -n '300,330p'Repository: dbpedia/databus-python-client
Length of output: 1488
🏁 Script executed:
#!/bin/bash
# Confirm that _open_reader/_open_writer own the file handles they create
# while _wrap_reader/_wrap_writer receive caller-owned streams
echo "=== _open_reader and _open_writer ==="
echo "These functions OPEN file paths themselves, so they OWN the file handles."
echo "They return context managers that are used as: with _open_reader(...) as reader:"
echo "When used with closefd=True (default), closing releases the file they opened. ✓ CORRECT"
echo ""
echo "=== _wrap_reader and _wrap_writer ==="
echo "These functions receive already-opened BinaryIO objects from the caller."
echo "The caller OWNS these streams and expects them to remain open."
echo "When used in convert_stream, with closefd=True (default), closing also closes caller-owned streams. ✗ WRONG"Repository: dbpedia/databus-python-client
Length of output: 636
Pass closefd=False to zstd stream wrappers in _wrap_reader and _wrap_writer to prevent closing caller-owned streams.
The stream_reader() call at line 461 and stream_writer() call at line 471 default to closefd=True, causing the caller-owned input_stream and output_stream to be closed when the wrappers are closed (line 375 and line 364's with block exit). Since the caller owns these streams, add closefd=False to both:
Required changes
# Line 461 (in _wrap_reader)
return dctx.stream_reader(stream, closefd=False)
# Line 471 (in _wrap_writer)
return cctx.stream_writer(stream, closefd=False)Note: The similar calls in _open_reader (line 427) and _open_writer (line 438) are correct as-is—those functions own the file handles they create, so closefd=True is appropriate.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@databusclient/extensions/file_converter.py` around lines 455 - 462, The zstd
wrappers in _wrap_reader and _wrap_writer currently call
_zstd.ZstdDecompressor().stream_reader(...) and
_zstd.ZstdCompressor().stream_writer(...) with the default closefd=True which
closes caller-owned streams: update the calls in _wrap_reader (stream_reader)
and _wrap_writer (stream_writer) to pass closefd=False so the wrapper won't
close the underlying BinaryIO owned by the caller; keep existing behavior
elsewhere unchanged.
| from databusclient.extensions.file_converter import ( | ||
| FileConverter, | ||
| COMPRESSION_EXTENSIONS, | ||
| COMPRESSION_MODULES, | ||
| ) | ||
| from databusclient.api.download import ( | ||
| _detect_compression_format, | ||
| _should_convert_file, | ||
| _get_converted_filename, | ||
| _convert_compression_format, | ||
| ) |
There was a problem hiding this comment.
Unused imports cause pipeline failure (Ruff F401).
COMPRESSION_EXTENSIONS, COMPRESSION_MODULES, _detect_compression_format, _get_converted_filename, and _convert_compression_format are imported but never referenced in the test file. The CI build is failing because of these.
🐛 Proposed fix
from databusclient.extensions.file_converter import (
FileConverter,
- COMPRESSION_EXTENSIONS,
- COMPRESSION_MODULES,
-)
-from databusclient.api.download import (
- _detect_compression_format,
- _should_convert_file,
- _get_converted_filename,
- _convert_compression_format,
)
+from databusclient.api.download import _should_convert_file
from databusclient.cli import download🧰 Tools
🪛 GitHub Check: build
[failure] 33-33: Ruff (F401)
tests/test_file_converter.py:33:5: F401 databusclient.api.download._convert_compression_format imported but unused
[failure] 32-32: Ruff (F401)
tests/test_file_converter.py:32:5: F401 databusclient.api.download._get_converted_filename imported but unused
[failure] 30-30: Ruff (F401)
tests/test_file_converter.py:30:5: F401 databusclient.api.download._detect_compression_format imported but unused
[failure] 27-27: Ruff (F401)
tests/test_file_converter.py:27:5: F401 databusclient.extensions.file_converter.COMPRESSION_MODULES imported but unused
[failure] 26-26: Ruff (F401)
tests/test_file_converter.py:26:5: F401 databusclient.extensions.file_converter.COMPRESSION_EXTENSIONS imported but unused
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_file_converter.py` around lines 24 - 34, The test file imports
several names that are never used causing Ruff F401; remove the unused imports
COMPRESSION_EXTENSIONS and COMPRESSION_MODULES from
databusclient.extensions.file_converter and remove _detect_compression_format,
_get_converted_filename, and _convert_compression_format from
databusclient.api.download so only FileConverter and _should_convert_file (or
whatever names the tests actually use) remain imported, or alternatively
reference those imported symbols in the tests if they are intended to be tested;
update the import block in tests/test_file_converter.py to import only the
symbols actually used (e.g., FileConverter and _should_convert_file) to resolve
the linter error.
Description
This PR adds a new
file_converter.pyextension module to address Issue #54. The module provides a streaming pipeline for file format conversion with support for gzip decompression and checksum validation.Changes
databusclient/extensions/file_converter.pymoduleFileConverterclass with streaming supportdecompress_gzip_stream()method with optional checksum validationcompress_gzip_stream()method for gzip compressionvalidate_checksum_stream()method for SHA256 checksum validationRelated Issues
Fixes #54
Type of change
Summary by CodeRabbit
New Features
Tests