I recently began experimenting with async iterators, using the --harmony
flag in Node 9.5. After reading through the MDN docs, I figured that it might be interesting to make a function that takes the values yielded by an async generator function and serves them as an SSE event stream. The following is a simple, contrived example:
const http = require("http");
const timer = time =>
new Promise(resolve => setTimeout(resolve, time));
async function* counter() {
let counter = 0;
while (true) {
await timer(5000);
yield counter++;
}
}
async function iterStream(res, iter) {
res.writeHead(200, {"Content-Type": "text/event-stream"});
for await (let item of iter)
res.write(`event: counter\ndata: ${item}\n\n`);
}
http.createServer((req, res) =>
iterStream(res, counter())).listen(8000);
The iterator yields a new value from the counter every five seconds. The iterStream
function takes each new value and broadcasts it to the connected user. With an EventSource
on the client side, I received the JSON object with the incrementing value:
let events = new EventSource("/");
events.addEventListener("counter", ({data}) => console.log(data));
Next, a slightly less contrived example. Instead of using a timer to introduce an artificial delay between items, I'm going to use a RethinkDB query with a changefeed, which broadcasts live updates from the database:
const http = require("http");
const r = require("rethinkdbdash")();
async function* query() {
let cursor = await r.db("rethinkdb").table("stats").changes();
while (true)
yield cursor.next();
}
async function iterStream(res, iter) {
res.writeHead(200, {"Content-Type": "text/event-stream"});
for await (let item of iter)
res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
}
http.createServer((req, res) =>
iterStream(res, query())).listen(8000);
The query
function waits for and yields new values from the database in a loop. For reference, the client library's cursor
does not conform with the iteration protocol, it just (coincidentally) happens to use next
as the name of the method that retrieves a new item. The cursor.next
method returns a promise that resolves when a new item is available from the database.
There's a minor wrinkle, however: the database connection continues to stream cursor results even after the user disconnects. I figured that I could address the problem by invoking iter.return
in my iterStream
function when the connection closes. When the iterator terminates, I can use a finally
block in my query
function to close the database cursor:
async function* query() {
let cursor = await r.db("test").table("test").changes();
try {
while (true)
yield cursor.next();
}
finally {
console.log("Closing the cursor");
cursor.close();
}
}
async function iterStream(res, iter) {
res.writeHead(200, {"Content-Type": "text/event-stream"});
res.connection.on("close", () => iter.return());
for await (let item of iter)
res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
}
This approach seems to work, but not exactly as I expected. When the user closes their connection and iter.return
forces the generator to finish, the pending cursor.next
call sits there and waits until one more item comes in before the finally
block executes—it doesn't interrupt the pending promise.
I tried using iter.throw
instead of iter.return
to see if it would give me the desired behavior, but it does basically the same thing: with iter.throw
, a catch
block in the query
function doesn't execute until the next yield.
This same behavior is reproducible with the timed counter example:
async function* test() {
let n = 0;
try {
while (true) {
console.log("Awaiting the timer")
await timer(5000);
console.log("Yielding a new item")
yield n++;
}
}
finally {
console.log("Iterator finished")
}
}
async function iterStream(res, iter) {
res.writeHead(200, {"Content-Type": "text/event-stream"});
res.connection.on("close", () => {
console.log("Connection closed");
iter.return();
});
for await (let item of iter) {
console.log("Sending item:", item);
res.write(`event: item\ndata: ${item}\n\n`);
}
}
When I run the example above and disconnect after receiving the second item, the resulting output looks like this:
waiting the timer
Yielding a new item
Sending item: 0
Awaiting the timer
Yielding a new item
Sending item: 1
Awaiting the timer
Yielding a new item
Sending item: 2
Awaiting the timer
Connection closed
Yielding a new item
Iterator finished
Sending item: 3
After the connection closes, the finally
block doesn't execute until the pending timer completes.
Am I missing something? Is there another approach that I could use to address connection termination in this example while still using async iterators?