New update:
Initialize a queue in the main application thread

By default, queues are initialized in a sandboxed process. This allows utilizing multiple CPU cores when processing jobs, and they can’t block the main thread. So this is great for long-running jobs or CPU-intensive tasks.
I added the option to create queues that run jobs in the main application thread. This is, for example, useful if you want to emit socket events.
Since you can schedule jobs inside other jobs here is a great use case:
- Create a queue that runs in the same thread. Let’s call the queue socket. Tick the Autostart and Same Thread checkboxes when creating the queue
- Next, create a queue that runs in a sandboxed process. Let’s call it long_job.
- Now, you can add a socket emit to the first queue and a long-running job to the second queue. At the end of the long-running job in the second queue, schedule a job in the first queue.
That lets you pass data to the main thread. So you have the benefits of nonblocking sandboxed processing while running a quick emit in the main thread when the job is done.
Autostarting Queues
Autostarting queues were initialized automatically inside your routes. Since the sandboxed jobs always create a new app instance in a separate thread, this lead to the queues being initialized again whenever a sandboxed job is processed. So the middleware had to be moved into server.js. There is currently no way to do this via extension, but if you need autostarting queues, then here is how I do that:
- Create a folder
scripts
in the root of the project. Add the file startQueues.js
inside the folder.
Add the following content to the file:
const App = require("../lib/core/app");
const ioredis = require("ioredis");
const bullQueuesModule = require("../extensions/server_connect/modules/bull_queues");
const { logMessage } = require("../extensions/server_connect/modules/advanced-logger");
function getRedisInstance() {
return new ioredis({
port: process.env.REDIS_PORT || (global.redisClient ? global.redisClient.options.port : 6379),
host: process.env.REDIS_HOST || (global.redisClient ? global.redisClient.options.host : 'localhost'),
db: process.env.REDIS_BULL_QUEUE_DB || 3,
password: process.env.REDIS_PASSWORD || (global.redisClient ? global.redisClient.options.password : undefined),
username: process.env.REDIS_USER || (global.redisClient ? global.redisClient.options.user : undefined),
tls: process.env.REDIS_TLS || (global.redisClient ? global.redisClient.options.tls : undefined),
});
}
const startQueues = async() => {
const redis = getRedisInstance();
const appInstance = new App();
try {
const queueKeys = await redis.keys("autostartqueues:*");
if (queueKeys.length) {
for (let queueKey of queueKeys) {
const optionsString = await redis.get(queueKey);
if (optionsString) {
const options = JSON.parse(optionsString);
await bullQueuesModule.create_queue.bind(appInstance)(options);
}
}
await logMessage({
message: "Queues successfully initialized on server start",
log_level: "info",
});
}
} catch (error) {
await logMessage({
message: `Failed to initialize queues on server start: ${error.message}`,
log_level: "error",
details: error,
});
}
};
module.exports = startQueues;
- Next edit the file
lib/server.js
Right after
global.io = io;
and before
module.exports = {
add
//Start queues
const startQueues = require('../scripts/startQueues');
After
start: function(port) {
add
//startQueues
startQueues();
It should look like this:
const server = http.createServer(app);
const io = sockets(server, session);
// Make sockets global available
global.io = io;
//Start queues
const startQueues = require('../scripts/startQueues');
module.exports = {
server,
app,
io,
start: function(port) {
//startQueues
startQueues();
// We add the 404 and 500 routes as last
app.use((req, res) => {
res.status(404).json({
status: '404',
message: `${req.url} not found.`
});
});
app.use((err, req, res, next) => {
debug(`Got error? %O`, err);
res.status(500).json({
status: '500',
code: config.debug ? err.code : undefined,
message: config.debug ? err.message || err : 'A server error occured, to see the error enable the DEBUG flag.',
stack: config.debug ? err.stack : undefined,
});
});
cron.start();
server.listen(port || config.port, () => {
console.log(`App listening at http://localhost:${config.port}`);
});
}
};
@George It would be great if I could do that via extensions so that the setup becomes easier for the community.