Jacob Paris
← Back to all content

Stream BullMQ job progress with Remix EventSources

When we think about streaming, we often think about streaming video or audio. But streaming can be used for more than just media. It can be used to stream data from a server to a client. In this guide, we'll look at how to stream the progress of a BullMQ job to the client.

BullMQ is a modern, fast, and robust queue system for Node.js. It is a successor to the popular Bull library, and is built on top of Redis. If you've integrated BullMQ into your app, you can create a resource route that streams the progress of a running job to the client.

Create a new resource route at app/routes/jobs.$id.progress.tsx

This route will use the eventStream utility from remix-utils to create an event source that will listen for the progress event from the job queue, and send the progress to the client.

import { processItemQueue } from "~/workers/processItem.server"
import { eventStream } from "remix-utils"
export async function loader({
request,
params,
}: LoaderArgs) {
const id = params.id as string
if (!id) {
return new Response("Not found", { status: 404 })
}
const job = await processItemQueue.getJob(id)
if (!job) {
return new Response("Not found", { status: 404 })
}
return eventStream(request.signal, function setup(send) {
job.isCompleted().then((completed) => {
if (completed) {
send({ event: "progress", data: String(100) })
}
})
processItemQueue.events.addListener(
"progress",
onProgress,
)
function onProgress({
jobId,
data,
}: {
jobId: string
data: number | object
}) {
if (jobId !== id) return
send({ event: "progress", data: String(data) })
if (data === 100) {
console.log("progress is 100, removing listener")
processItemQueue.events.removeListener(
"progress",
onProgress,
)
}
}
return function clear() {
processItemQueue.events.removeListener(
"progress",
onProgress,
)
}
})
}

In any route that needs to read this progress, you can use the useEventSource hook to subscribe to the event source.

This is like useState but will automatically update the state when the event source sends a new event.

import { useEventSource } from "remix-utils"
export default function Route() {
const jobId = "123"
const progress = useEventSource(
`/jobs/${jobId}/progress`,
{
event: "progress",
},
)
return (
<div>
<h1>Job Progress</h1>
<p>Progress: {progress}</p>
</div>
)
}
Professional headshot

Hi, I'm Jacob

Hey there! I'm a developer, designer, and digital nomad with a background in lean manufacturing.

About once per month, I send an email with new guides, new blog posts, and sneak peeks of what's coming next.

Everyone who subscribes gets access to the source code for this website and every example project for all my tutorials.

Stay up to date with everything I'm working on by entering your email below.

Unsubscribe at any time.