Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 80 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ pub use error::AsyncHttpRangeReaderError;
/// if response.status() == reqwest::StatusCode::NOT_MODIFIED {
/// Ok(None)
/// } else {
/// let reader = AsyncHttpRangeReader::from_head_response(client, response, HeaderMap::default()).await?;
/// let url = response.url().clone();
/// let reader = AsyncHttpRangeReader::from_head_response(client, response, url, HeaderMap::default()).await?;
Comment on lines +75 to +76
Copy link
Contributor Author

@zanieb zanieb Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, in particular, is awkward. Perhaps we should have a different API instead? Like AsyncHttpRangeReader::from_head_response_custom_url?

I feel like similar to my commentary in #9 it'd be nice to have some sort of Builder type that can be constructed and mutated before the AsyncHttpRangeReader itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree that a builder would be a lot nicer now that the options are increasing. Would you be able to wip something up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into it! It might not be quick though :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I'm overstepping, but I felt like trying to make a Builder, so I opened #15 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh thanks! I had just asked @charliermarsh to look into this because I was too busy. We'll review it :)

/// Ok(Some(reader))
/// }
/// }
Expand Down Expand Up @@ -131,6 +132,15 @@ pub enum CheckSupportMethod {
Head,
}

/// Which URL should be used for subsequent range requests?
pub enum RangeRequestUrlSource {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this name

/// Use the initial request URL
Request,

/// Use the initial response URL
Response,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ever correct to use the response URL?

Copy link
Contributor Author

@zanieb zanieb Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, is there a RFC about HEAD/GET range requests?

(this implementation is based on @baszalmstra's comment that either may be preferred)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only when signed redirects are used this doesnt work. In other cases the extra hops might be avoidable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, it's definitely less efficient to do an extra hop on every subsequent request. I guess it's possible something would return a 302 for HEAD but not for GET but that seems really weird?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in the comments we could explain the tradeoff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can definitely explain the cases.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, is there a RFC about HEAD/GET range requests?

@zanieb The RFC that covers Range requests seems to be section 14 of the HTTP Semantics one: https://datatracker.ietf.org/doc/html/rfc9110#name-range-requests

(I first found https://datatracker.ietf.org/doc/html/rfc7233, but it says that it's obsoleted by RFC9110)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ever correct to use the response URL?

To me it feels like an optimization that I can't find being discussed in the HTTP rfc, and if I'm right about that it sounds like something that maybe shouldn't be the default at least, but something you opt into if you know you'll never hit cases like the Amazon links in your application...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah after reading up on this some more I think I agree. @zanieb If you need a quick fix I would be happy to change the default for now without it being configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we shipped a change to the default here, I'd probably upgrade us immediately since we still see reports around this.

}

fn error_for_status(response: reqwest::Response) -> reqwest_middleware::Result<Response> {
response
.error_for_status()
Expand All @@ -143,6 +153,7 @@ impl AsyncHttpRangeReader {
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
url: reqwest::Url,
check_method: CheckSupportMethod,
range_request_url_source: RangeRequestUrlSource,
extra_headers: HeaderMap,
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
let client = client.into();
Expand All @@ -156,15 +167,23 @@ impl AsyncHttpRangeReader {
)
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_tail_response(client, response, extra_headers).await?;
let url = match range_request_url_source {
RangeRequestUrlSource::Request => url,
RangeRequestUrlSource::Response => response.url().clone(),
};
let self_ = Self::from_tail_response(client, response, url, extra_headers).await?;
Ok((self_, response_headers))
}
CheckSupportMethod::Head => {
let response =
Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default())
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_head_response(client, response, extra_headers).await?;
let url = match range_request_url_source {
RangeRequestUrlSource::Request => url,
RangeRequestUrlSource::Response => response.url().clone(),
};
let self_ = Self::from_head_response(client, response, url, extra_headers).await?;
Ok((self_, response_headers))
}
}
Expand Down Expand Up @@ -200,6 +219,7 @@ impl AsyncHttpRangeReader {
pub async fn from_tail_response(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
tail_request_response: Response,
url: Url,
extra_headers: HeaderMap,
) -> Result<Self, AsyncHttpRangeReaderError> {
let client = client.into();
Expand Down Expand Up @@ -245,7 +265,7 @@ impl AsyncHttpRangeReader {
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client,
tail_request_response.url().clone(),
url,
extra_headers,
Some((tail_request_response, start)),
memory_map,
Expand Down Expand Up @@ -300,6 +320,7 @@ impl AsyncHttpRangeReader {
pub async fn from_head_response(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
head_response: Response,
url: Url,
extra_headers: HeaderMap,
) -> Result<Self, AsyncHttpRangeReaderError> {
let client = client.into();
Expand Down Expand Up @@ -345,7 +366,7 @@ impl AsyncHttpRangeReader {
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client,
head_response.url().clone(),
url,
extra_headers,
None,
memory_map,
Expand Down Expand Up @@ -688,6 +709,7 @@ mod test {
Client::new(),
server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
check_method,
RangeRequestUrlSource::Response,
HeaderMap::default(),
)
.await
Expand Down Expand Up @@ -728,7 +750,7 @@ mod test {
);

// Prefetch the data for the metadata.json file
let entry = reader.file().entries().get(0).unwrap();
let entry = reader.file().entries().first().unwrap();
let offset = entry.header_offset();
// Get the size of the entry plus the header + size of the filename. We should also actually
// include bytes for the extra fields but we don't have that information.
Expand Down Expand Up @@ -783,6 +805,57 @@ mod test {
Client::new(),
server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
check_method,
RangeRequestUrlSource::Response,
HeaderMap::default(),
)
.await
.expect("bla");

// Also open a simple file reader
let mut file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda"))
.await
.unwrap();

// Read until the end and make sure that the contents matches
let mut range_read = vec![0; 64 * 1024];
let mut file_read = vec![0; 64 * 1024];
loop {
// Read with the async reader
let range_read_bytes = range.read(&mut range_read).await.unwrap();

// Read directly from the file
let file_read_bytes = file
.read_exact(&mut file_read[0..range_read_bytes])
.await
.unwrap();

assert_eq!(range_read_bytes, file_read_bytes);
assert_eq!(
range_read[0..range_read_bytes],
file_read[0..file_read_bytes]
);

if file_read_bytes == 0 && range_read_bytes == 0 {
break;
}
}
}

#[rstest]
#[case(RangeRequestUrlSource::Request)]
#[case(RangeRequestUrlSource::Response)]
#[tokio::test]
async fn async_range_reader_url_source(#[case] url_source: RangeRequestUrlSource) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to set up a test case that used redirects but it was kind of a pain? I'm not sure what kind of testing you think is valuable here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After I made #15, I also added a commit that has a test for this in my own fork. It adds an endpoint to the unit-test http server that redirects based on the HTTP method used, and that target URL returns 405 Method Not Allowed if called with another method: torarvid#1

(Didn't want to add it to #15 to keep the PR size manageable, but I can if it's wanted)

// Spawn a static file server
let path = Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("test-data");
let server = StaticDirectoryServer::new(&path);

// Construct an AsyncRangeReader
let (mut range, _) = AsyncHttpRangeReader::new(
Client::new(),
server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
CheckSupportMethod::Head,
url_source,
HeaderMap::default(),
)
.await
Expand Down Expand Up @@ -825,6 +898,7 @@ mod test {
Client::new(),
server.url().join("not-found").unwrap(),
CheckSupportMethod::Head,
RangeRequestUrlSource::Response,
HeaderMap::default(),
)
.await
Expand Down