Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -141,7 +139,6 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private EndpointDetails endpointDetails;
private boolean goAwayReceived;

private final Map<Integer, PriorityValue> priorities = new ConcurrentHashMap<>();
private volatile boolean peerNoRfc7540Priorities;


Expand Down Expand Up @@ -1020,6 +1017,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PRIORITY_UPDATE payload");
}
final int prioritizedId = payload.getInt() & 0x7fffffff;
if (prioritizedId == 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "PRIORITY_UPDATE stream id is 0");
}
final int len = payload.remaining();
final String field;
if (len > 0) {
Expand All @@ -1029,9 +1029,11 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
} else {
field = "";
}
final PriorityValue pv = PriorityParamsParser.parse(field).toValueWithDefaults();
priorities.put(prioritizedId, pv);
requestSessionOutput();
final PriorityValue pv = parsePriorityValue(field);
final H2Stream prioritizedStream = streams.lookupSeen(prioritizedId);
if (prioritizedStream != null) {
prioritizedStream.setPriorityValue(pv);
}
}
break;
}
Expand Down Expand Up @@ -1106,7 +1108,7 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr
if (streamListener != null) {
streamListener.onHeaderInput(this, streamId, headers);
}
recordPriorityFromHeaders(streamId, headers);
recordPriorityFromHeaders(stream, headers);
stream.consumeHeader(headers, frame.isFlagSet(FrameFlag.END_STREAM));
} else {
continuation.copyPayload(payload);
Expand All @@ -1125,7 +1127,7 @@ private void consumeContinuationFrame(final RawFrame frame, final H2Stream strea
if (streamListener != null) {
streamListener.onHeaderInput(this, streamId, headers);
}
recordPriorityFromHeaders(streamId, headers);
recordPriorityFromHeaders(stream, headers);
if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
stream.consumePromise(headers);
} else {
Expand Down Expand Up @@ -1378,19 +1380,27 @@ H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler strea
return stream;
}

private void recordPriorityFromHeaders(final int streamId, final List<? extends Header> headers) {
private void recordPriorityFromHeaders(final H2Stream stream, final List<? extends Header> headers) {
if (headers == null || headers.isEmpty()) {
return;
}
for (final Header h : headers) {
if (HttpHeaders.PRIORITY.equalsIgnoreCase(h.getName())) {
final PriorityValue pv = PriorityParamsParser.parse(h.getValue()).toValueWithDefaults();
priorities.put(streamId, pv);
final PriorityValue pv = parsePriorityValue(h);
stream.setPriorityValue(pv);
break;
}
}
}

private PriorityValue parsePriorityValue(final Header header) {
return PriorityParamsParser.parse(header).toValueWithDefaults();
}

private PriorityValue parsePriorityValue(final String field) {
return PriorityParamsParser.parse(field).toValueWithDefaults();
}

class H2StreamChannelImpl implements H2StreamChannel {

private final int id;
Expand Down Expand Up @@ -1438,18 +1448,21 @@ public void submit(final List<Header> headers, final boolean endStream) throws I
return;
}
ensureNotClosed();
if (peerNoRfc7540Priorities && streams.isSameSide(id)) {
if (peerNoRfc7540Priorities) {
for (final Header h : headers) {
if (HttpHeaders.PRIORITY.equalsIgnoreCase(h.getName())) {
final byte[] ascii = h.getValue() != null
? h.getValue().getBytes(StandardCharsets.US_ASCII)
: new byte[0];

final int sid = id & 0x7fffffff;
final ByteArrayBuffer b = new ByteArrayBuffer(4 + ascii.length);
b.append((byte) (id >> 24));
b.append((byte) (id >> 16));
b.append((byte) (id >> 8));
b.append((byte) id);
b.append((byte) (sid >> 24));
b.append((byte) (sid >> 16));
b.append((byte) (sid >> 8));
b.append((byte) sid);
b.append(ascii, 0, ascii.length);

final ByteBuffer pl = ByteBuffer.wrap(b.array(), 0, b.length());
final RawFrame priUpd = new RawFrame(FrameType.PRIORITY_UPDATE.getValue(), 0, 0, pl);
commitFrameInternal(priUpd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
import org.apache.hc.core5.http2.priority.PriorityValue;
import org.apache.hc.core5.util.Timeout;

class H2Stream implements StreamControl {
Expand All @@ -64,6 +65,7 @@ class H2Stream implements StreamControl {
private volatile long lastActivityNanos;

private volatile Timeout idleTimeout;
private volatile PriorityValue priorityValue;

H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer<State> stateChangeCallback) {
this.channel = channel;
Expand Down Expand Up @@ -313,7 +315,7 @@ public String toString() {
buf.append("[")
.append("id=").append(channel.getId())
.append(", reserved=").append(reserved)
.append(", removeClosed=").append(remoteClosed)
.append(", remoteClosed=").append(remoteClosed)
.append(", localClosed=").append(channel.isLocalClosed())
.append(", localReset=").append(channel.isLocalReset())
.append("]");
Expand All @@ -332,4 +334,12 @@ Timeout getIdleTimeout() {
return idleTimeout;
}

}
PriorityValue getPriorityValue() {
return priorityValue;
}

void setPriorityValue(final PriorityValue priorityValue) {
this.priorityValue = priorityValue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,10 @@ void testPriorityUpdateInputAccepted() throws Exception {
h2StreamListener,
() -> streamHandler);

// Make stream id=1 "seen" so lookupSeen(1) does not fail.
final H2StreamChannel channel = mux.createChannel(1);
mux.createStream(channel, streamHandler);

final WritableByteChannelMock writable = new WritableByteChannelMock(1024);
final FrameOutputBuffer fob = new FrameOutputBuffer(16 * 1024);

Expand All @@ -805,16 +809,47 @@ void testPriorityUpdateInputAccepted() throws Exception {
fob.write(priUpd, writable);
final byte[] bytes = writable.toByteArray();

// Should NOT throw; server must accept PRIORITY_UPDATE from client
Assertions.assertDoesNotThrow(() -> mux.onInput(ByteBuffer.wrap(bytes)));

// Listener sees the incoming frame
Mockito.verify(h2StreamListener).onFrameInput(
ArgumentMatchers.same(mux),
ArgumentMatchers.eq(0),
ArgumentMatchers.any());
}

@Test
void testPriorityUpdateInputRejectedForUnseenStream() throws Exception {
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
protocolIOSession,
FRAME_FACTORY,
StreamIdGenerator.ODD,
httpProcessor,
CharCodingConfig.DEFAULT,
H2Config.custom().setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE).build(),
h2StreamListener,
() -> streamHandler);

final WritableByteChannelMock writable = new WritableByteChannelMock(1024);
final FrameOutputBuffer fob = new FrameOutputBuffer(16 * 1024);

final byte[] ascii = "u=3,i".getBytes(StandardCharsets.US_ASCII);
final ByteBuffer payload = ByteBuffer.allocate(4 + ascii.length);
payload.putInt(1); // prioritized stream id = 1 (unseen)
payload.put(ascii);
payload.flip();

final RawFrame priUpd = new RawFrame(FrameType.PRIORITY_UPDATE.getValue(), 0, 0, payload);
fob.write(priUpd, writable);
final byte[] bytes = writable.toByteArray();

final H2ConnectionException ex = Assertions.assertThrows(
H2ConnectionException.class,
() -> mux.onInput(ByteBuffer.wrap(bytes)));

Assertions.assertEquals("Unexpected stream id: 1", ex.getMessage());
}


// Helper: minimal stream handler that sends our headers once
static final class PriorityHeaderSender implements H2StreamHandler {
private final H2StreamChannel channel;
Expand Down
Loading