Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release reader immediately when shutting down a pipe #1208

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,17 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
|originalError|, then:
1. If |shuttingDown| is true, abort these substeps.
1. Set |shuttingDown| to true.
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
<p class="note">The initial reader is released to ensure that any pending read requests
are immediately aborted, and no more chunks are pulled from |source|. A new reader is
acquired in order to keep |source| locked until the shutdown is [=finalized=], for example
to [=cancel a readable stream|cancel=] |source| if necessary.
This exchange of readers is not observable to author code and the user agent is free to
implement this differently, for example by keeping the same reader and internally aborting
its pending read requests.
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand All @@ -2210,6 +2221,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
ask to shutdown, optionally with an error |error|, then:
1. If |shuttingDown| is true, abort these substeps.
1. Set |shuttingDown| to true.
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand Down
12 changes: 11 additions & 1 deletion reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
assert(IsReadableStreamLocked(source) === false);
assert(IsWritableStreamLocked(dest) === false);

const reader = AcquireReadableStreamDefaultReader(source);
let reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);

source._disturbed = true;
Expand Down Expand Up @@ -200,6 +200,12 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

return transformPromiseWith(writer._readyPromise, () => {
if (shuttingDown === true) {
return promiseResolvedWith(true);
}
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) {
return promiseResolvedWith(true);
}
Comment on lines +207 to +209
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implements @domenic's suggestion from #1207 (comment).

I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:

Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.

We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think we need to update the spec text.

return new Promise((resolveRead, rejectRead) => {
ReadableStreamDefaultReaderRead(
reader,
Expand Down Expand Up @@ -289,6 +295,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
return;
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
Expand All @@ -310,6 +318,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
return;
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests