Jacob Paris
← Back to all content

Stream BullMQ job progress with Remix EventSources

When we think about streaming, we often think about streaming video or audio. But streaming can be used for more than just media. It can be used to stream data from a server to a client. In this guide, we'll look at how to stream the progress of a BullMQ job to the client.

BullMQ is a modern, fast, and robust queue system for Node.js. It is a successor to the popular Bull library, and is built on top of Redis. If you've integrated BullMQ into your app, you can create a resource route that streams the progress of a running job to the client.

Create a new resource route at app/routes/jobs.$id.progress.tsx

This route will use the eventStream utility from remix-utils to create an event source that will listen for the progress event from the job queue, and send the progress to the client.

tsx
import { processItemQueue } from "~/workers/processItem.server.ts"
import { eventStream } from "remix-utils/sse/server"
export async function loader({
request,
params,
}: LoaderFunctionArgs) {
const id = params.id as string
if (!id) {
return new Response("Not found", { status: 404 })
}
const job = await processItemQueue.getJob(id)
if (!job) {
return new Response("Not found", { status: 404 })
}
return eventStream(request.signal, function setup(send) {
job.isCompleted().then((completed) => {
if (completed) {
send({ event: "progress", data: String(100) })
}
})
processItemQueue.events.addListener(
"progress",
onProgress,
)
function onProgress({
jobId,
data,
}: {
jobId: string
data: number | object
}) {
if (jobId !== id) return
send({ event: "progress", data: String(data) })
if (data === 100) {
console.log("progress is 100, removing listener")
processItemQueue.events.removeListener(
"progress",
onProgress,
)
}
}
return function clear() {
processItemQueue.events.removeListener(
"progress",
onProgress,
)
}
})
}

In any route that needs to read this progress, you can use the useEventSource hook to subscribe to the event source.

This is like useState but will automatically update the state when the event source sends a new event.

tsx
import { useEventSource } from "remix-utils/sse/react"
export default function Route() {
const jobId = "123"
const progress = useEventSource(
`/jobs/${jobId}/progress`,
{
event: "progress",
},
)
return (
<div>
<h1>Job Progress</h1>
<p>Progress: {progress}</p>
</div>
)
}
Professional headshot
Moulton
Moulton

Hey there! I'm a developer, designer, and digital nomad building cool things with Remix, and I'm also writing Moulton, the Remix Community Newsletter

About once per month, I send an email with:

  • New guides and tutorials
  • Upcoming talks, meetups, and events
  • Cool new libraries and packages
  • What's new in the latest versions of Remix

Stay up to date with everything in the Remix community by entering your email below.

Unsubscribe at any time.