🦬 Bull Queues for Node

This module extension for server connect allows for the user of Bull Queues to execute Library actions in the background.

Links to latest versions in comments below.

:beer: The module is free to all, and I’ll gladly accept any donations to support! https://www.buymeacoffee.com/uniqueideas :beer:

Let me know if you have any issues, or requests for further development!

Special thanks to @patrick for his help, as well as @kfawcett for being the guinea pig! Also to @JonL and @sid and all the other people posting information about custom modules as I definitely pulled tidbits from many places.

Why would you want this?

Sometimes it is preferably to hand a task off to be run in the background. For example if the task will take a long time to run, you do not want your users to be stuck with a spinner forever. You can now hand that task off to a queue, and let it run in the background. When finished, you can notify the user if necessary via websocket, etc.

Or perhaps the task is not critical to the user flow, so you can hand it off to a queue and deal with retries, errors, etc.

Functionality

Allows for the creation of one to many queues to offload the execution of Wappler library tasks by using Bull Queues

Requirements

  • Functioning Redis connection, specified within the Wappler server config
  • Wappler project using Node server model

Installation

  • In your project folder, create /extensions/server_connect/modules (if it does not yet exist)
  • Unzip the source code into /extensions/server_connect/modules (3 files)
  • Refresh the Server Actions panel (restarting Wappler is also an option)
  • The required libraries will be installed automatically upon use and next deployment
  • You should now have a Bull Queues group in your available actions list for server workflows

Actions

All actions require a queue name be provided

Create Queue

  • Creates a queue with optional parameters
  • Responds with message indicating result

Add Job

  • Add a job into a queue
  • The job will execute the specified Library File, pasing it the PARAM values provided
  • The job id is also provided to the library and can be accessed using $_PARAM.id
  • Optionally create a queue with the default set of parameters (see below)
  • Responds with the job id

Queue Status

  • Returns the job counts for the specified queue (Active, completed, waiting, delayed, etc.)

Job State

  • Returns the job details for a given job id, along with the current status

Destroy Queue

  • Forecably destroys a given queue
  • Removes any and all jobs from the queue (any jobs currently running will complete)
  • Resets the job id back to 1

Queue Parameters

  • Queue name - Used to specify a unique queue and is used by actions to specify the queue
  • Number of concurrent jobs - The number of jobs that can be run in parallel for this queue
  • Max jobs - The maximum number of jobs to be run within a given duration
  • Duration for max jobs - Number of milliseconds used when calculating max jobs
  • The default parameters are 5 concurrent jobs, and no rate limiting (no max jobs, and no duration)

Rate limiting

By using the max jobs and duration parameters for a queue, a queue can limit how quickly jobs are processed. For example if the library uses an external api that limits usage to 1 request per second, the queue can be configured with Max jobs = 1, and Duration = 1000

21 Likes

This is an amazing feature and will be useful to almost anyone pulling data in from APIs or sending out large amounts of data (e.g. emails). Bull should retry failed transactions and not pause the user’s experience since the data processing is moved out of the Server Action that you most likely used to kick off the process from the UI.

As an example: I’m connecting to an API that fails pretty often and only allows one request per second. During each run I need the user to provide a value and then I need to pull 10,000 records from a third-party API, so having a job management system like Bull is essential. Working with @mebeingken he was able to setup some features to allow me to limit the amount of records that can be processed for the queue I setup for this specific use case. In the future I can setup other queues that may be able to process more transactions per second without affecting the queue setup for other processes.

5 Likes

Great job! That is one hell of an extension!

I feel that any amount of beers I send you will look ridiculous next to Keith’s truckload of them.

I don’t have a use for queues right now but it’s just a matter of time. This kind of effort should be acknowledged and rewarded. So I might be able to send some six-packs plus work on bugs/feature PRs if you accept them.

5 Likes

Yes, Keith was extremely generous, especially given how little I drink! :wink:

Happy to accept PRs Jon…not only for the added bandwidth of dev, but also to correct my pleb ways of doing things! :rofl:

3 Likes

You live in Cali, so you could have said :evergreen_tree::evergreen_tree::evergreen_tree: and I would have given even more. :smiley:

1 Like

You can now delay a job a minimum amount of time – In other words, don’t run the job for x number of milliseconds.

2 Likes

Awesome Ken! Thanks for sharing!

Big Thanks @mebeingken looking forward to use this in the future and give feedback if any needed.

New actions

Clean queue

  • Removes jobs from a queue by job status
  • Optionally set grace period to retain newer jobs while deleting old
  • Job status choices: Completed, Delayed, Failed (Active jobs cannot be removed, Waiting not support by Bull queue library)

Add Job API

  • Same as Add Job action, but triggers an API instead of a Library

Get jobs

  • Retrieve jobs by job status from a specified queue
  • Job choices: Active, Failed, Waiting, Delayed, Completed

Existing actions

Add Job

  • Optionally remove completed jobs from the queue
  • NOTE: The Add Job action currently does not work if it is executed from a previously submitted job. In other words, if you are doing an iteration where you submit new jobs from a previously submitted job, it will fail. Use the Add Job API instead.
4 Likes

We have come far since our bubble days, huh? It’s been the journey!
Try implementing this there :smiley:

5 Likes

No doubt, but also, I am have so far to go!

I’m getting deeper and deeper into javascript and there is sooo much I don’t know. Patrick will kindly explain something or provide a sample, and I can’t even form a proper question. :rofl:

2 Likes

Potential Breaking Changes

  • The code now specifies to use DB 2 of redis, in order to separate from the session db of Wappler. This can easily be changed to any redis db number inside the bull_queues.js file.
  • Queue names “under the hood” were previously different than the specified name – the same name is now used.
  • Adding jobs previously allowed for the creation of queues, this has been removed. The new paradigm is that jobs may always be added to queues, and will wait for a Create Queue action, which attached the job processor. This better follows the Bull separation of Consumers and Producers.

Other changes

  • Several modifications to how queues are utilized
  • General cleanup
  • Improved error reporting

New actions

Pause queue

  • A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized.

Resume queue

  • Resumes processing of new jobs on a queue.

Existing actions

Create Queue

  • Must specify a processor type (Library or API). If jobs are waiting in the queue, they must be of the same processing type, or they will fail when attempted.
  • Improved displayed of throttling options

Add Job

  • Removed the option to create a queue. Jobs may now be added to a queue before creation and will wait for a Create Queue action to begin processing.
  • Set the number of attempts to be made to process a job.
  • When a queue has been created through the Create Queue action, an error will be generated if attempting to add a job to an API type of queue.

Add Job API

  • Removed the option to create a queue. Jobs may now be added to a queue before creation and will wait for a Create Queue action to begin processing.
  • Set the number of attempts to be made to process a job.
  • When a queue has been created through the Create Queue action, an error will be generated if attempting to add a job to a Library type of queue.

Queue Status

  • Improved the data binding of results in order to properly select response in downstream actions.

Destroy Queue

  • Now closes the redis connection as well as emptying the queue.
5 Likes

Dear @mebeingken Ken,

I’m trying to use your awesome extension, but I think I’m not understanding how it works!
I created one action as library:

I’m executing this action from another one, because I need to receive form parameters to run Wappler API action:

When running, I have this one Chrome Dev Tools:

They are all in “waiting” status…I tried to “resume”, but nothing happens :frowning:

Could you help me?

All the best!

To start, continue using the Add Job API action instead of using the library action. I started seeing some issues with the library side of things, and haven’t had time to come back and sort it out.

It is important that the Queue name (not the name of the action, the name of the actual Queue) is the same on your Create Queue action, and in your Add Job API actions. The Create Queue action attaches workers to process your jobs, if the queue names do not match, the jobs will sit and wait for you to attach workers.

Also, why are you cleaning the queue while you are creating and submitting jobs? If you tick the Remove on complete option when submitting the job, it will clean itself…I rarely have the Clean action in my workflows.

To get started, just use the Create Queue and Add Job API actions to see things work and then get more complicated later on.

And make sure you have the latest version of the extension.

1 Like

Thanks!! Will try asap!

Hey @mebeingken!!

Its working perfect on my dev enviroment…but when I deploy to my staging server, the execution fail with Error: connect ECONNREFUSED 152.243.184.158:3000

Any tip?

Thanks!

For anybody using this extension, I believe the 4.9.1 update has created some issues for the existing module. More research is needed, but I had to rollback the latest update as child processes were going wild.

1 Like

Other changes

  • Improved tracking of worker counts

New actions

Retry job

  • Allows resubmitting a job for processing via job id

Existing actions

Add Job API

  • Removed erroneous inclusion of require /lib/core/app which triggered issues on the release of Wappler 4.9.1
  • Place entire child process into try/catch to trap errors
  • Improved base url functionality to support more configurations by dynamically using request protocol
5 Likes

@mebeingken

Just trying to install this extension, I had to enable Redis and I’ve just added the ‘create queue’ action.

It’s throwing this error - I’m guessing something went wrong with either installing the module or redis?

message
: 
"Cannot read properties of undefined (reading 'ready')"
stack
: 
"TypeError: Cannot read properties of undefined (reading 'ready')\n    at Object.<anonymous> (/opt/node_app/extensions/server_connect/modules/bull_queues.js:8:39)\n    at Module._compile (node:internal/modules/cjs/loader:1126:14)\n    at Object.Module._extensions..js (node:internal/modules/cjs/loader:1180:10)\n    at Module.load (node:internal/modules/cjs/loader:1004:32)\n    at Function.Module._load (node:internal/modules/cjs/loader:839:12)\n    at Module.require (node:internal/modules/cjs/loader:1028:19)\n    at require (node:internal/modules/cjs/helpers:102:18)\n    at App._exec (/opt/node_app/lib/core/app.js:473:30)\n    at App._exec (/opt/node_app/lib/core/app.js:458:28)\n    at processTicksAndRejections (node:internal/process/task_queues:96:5)"
status
: 
"500"

Edit: I think related to my redis installation. Error seems to be gone

So you are all set? Let me know if any questions.

1 Like