official MongoDB documentation.
Cloud-based MongoDB: Sign up for MongoDB Atlas and create a cluster. Atlas offers a free tier for development purposes.
Once your MongoDB instance is ready, note your connection string. You'll need it to connect Agenda.js to your database.
If you haven't already, create a new Node.js project and initialize it:
mkdir my-agenda-app
cd my-agenda-app
npm init -y
Install Agenda.js and the MongoDB driver by running:
npm install agenda mongodb
Now, you'll set up Agenda.js in your Node.js application. Create a file named agendaSetup.js
and initialize Agenda with your MongoDB connection:
const Agenda = require('agenda');
const connectionOpts = {
db: { address: 'mongodb://localhost:27017/agendaDb', collection: 'jobs' },
processEvery: '30 seconds'
};
const agenda = new Agenda(connectionOpts);
module.exports = agenda;
If you're using a different database or host, replace 'mongodb://localhost:27017/agendaDb'
with your MongoDB connection string.
With Agenda, you define jobs by specifying a name and a function that gets called when the job is run. In the same or a different file, define a job like so:
const agenda = require('./agendaSetup');
agenda.define('say hello', async job => {
console.log('Hello, World!');
});
To schedule jobs, you need to start the agenda and then schedule your defined jobs according to your needs. You can do this in an app.js
file or at the end of your agendaSetup.js
file:
(async function() { // IIFE to use async/await
await agenda.start();
await agenda.every('1 hour', 'say hello');
console.log('Job scheduled to say hello every hour.');
})();
Run your application using Node.js:
node app.js
In your sign up endpoint just call agenda and schedule a job
app.post('/signup', async (req, res) => {
const { email } = req.body;
// Here you would add logic to save the user to your database
// Schedule the 'send welcome email' job
await agenda.schedule('in 2 minutes', 'send welcome email', { email });
res.status(200).send('User signed up successfully, welcome email scheduled.');
});
And have the job defined
// Define the 'send welcome email' job
agenda.define('send welcome email', async job => {
const { email } = job.attrs.data;
console.log(`Sending welcome email to ${email}`);
// Here you would integrate with your email service
});
Command Query Responsibility Segregation (CQRS) is a software architectural pattern that separates the operations of reading data (queries) from the operations of updating data (commands), allowing them to scale independently and optimize performance, complexity, and security for each operation type.
Integrating job queues with the CQRS pattern can enhance its effectiveness, particularly on the command side of the architecture. This integration brings several benefits, improving the system's scalability, reliability, and responsiveness.
CQRS is based on the principle that the models used to update information do not have to be the same as those used to read information. This separation allows for system design flexibility and can improve performance and scalability. The pattern fits well with event-driven architectures and domain-driven design (DDD), where it can provide clear boundaries and responsibilities within the system.
Improved Scalability: By using job queues to handle commands, you can offload the execution of these commands to background workers. This allows the system to handle a high volume of write requests more efficiently by spreading the load across multiple workers and resources, enhancing the scalability of the command model.
Enhanced Performance: Separating commands and queries allows each to be optimized for specific roles. Job queues can further optimize command execution by ensuring that write operations do not block read operations, thus improving the application's overall performance.
Increased Reliability and Fault Tolerance: Job queues can automatically retry failed commands, improving the system's reliability. This is particularly important for operations that must not fail, such as financial transactions or critical data updates. Using job queues ensures that commands can be retried or postponed until they can be completed.
Asynchronous Processing: Integrating job queues allows commands to be processed asynchronously, significantly improving the user experience by making the UI more responsive. Users can receive immediate feedback for their actions, even if the underlying command is processed in the background.
Event Sourcing Compatibility: CQRS often complements Event Sourcing, where changes to the application state are stored as a sequence of events. Job queues can efficiently handle generating and processing these events, facilitating a robust event-driven architecture.
Command Handling: In a CQRS-based system integrated with job queues, commands are dispatched to the job queue instead of being executed directly. This decouples the command's issuance from its execution, allowing for more flexible and scalable processing.
Consistency: While job queues and CQRS can improve performance and scalability, they also introduce eventual consistency into the system. This means the system might only partially reflect the results of a command. Designing your system to handle or mitigate the effects of eventual consistency is crucial.
Error Handling: Robust error handling and retry mechanisms should be in place to manage failed commands during execution. This ensures that the system can recover gracefully from errors without losing data or corrupting the application state.
To demonstrate a minimal reproducible example of a CQRS architecture using Express and Agenda.js, let's create a simple application. This app will have a command to "create a user" and a query to "get user details". The "create a user" command will be processed asynchronously using Agenda.js.
Initialize a new Node.js project (if you haven't already):
mkdir cqrs-agenda-example
cd cqrs-agenda-example
npm init -y
npm install express agenda mongodb body-parser
Set up Express and Agenda.js (app.js):
const express = require('express');
const bodyParser = require('body-parser');
const { MongoClient } = require('mongodb');
const Agenda = require('agenda');
const app = express();
const port = 3000;
app.use(bodyParser.json());
const mongoConnectionString = 'mongodb://127.0.0.1/agenda';
// Initialize MongoDB connection and Agenda
const client = new MongoClient(mongoConnectionString);
const agenda = new Agenda({ db: { address: mongoConnectionString } });
// Placeholder for users' data storage
const users = {};
// Define a job for creating a user in Agenda
agenda.define('create user', async (job) => {
const { userId, userName } = job.attrs.data;
// Simulate user creation delay
await new Promise(resolve => setTimeout(resolve, 1000));
users[userId] = { userId, userName };
console.log(`User created: ${userName}`);
});
(async function() { // Self-invoking async function to ensure proper startup
await client.connect();
await agenda.start();
console.log('Agenda started');
})();
// Command API to create a user
app.post('/users', async (req, res) => {
const { userId, userName } = req.body;
await agenda.schedule('in 2 seconds', 'create user', { userId, userName });
res.send({ message: `User creation scheduled for ${userName}` });
});
// Query API to get a user
app.get('/users/:userId', (req, res) => {
const { userId } = req.params;
const user = users[userId];
if (user) {
res.send(user);
} else {
res.status(404).send({ message: 'User not found' });
}
});
app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`);
});
MongoDB and Agenda Setup: This example connects to MongoDB, initializes Agenda with the connection, and defines a job for creating a user. The users object acts as a simple in-memory store.
Command Endpoint: The POST /users endpoint receives a userId and userName, schedules a "create user" job with Agenda, and responds immediately, acknowledging the scheduling.
Query Endpoint: The GET /users/:userId endpoint looks up and returns the user's details from the in-memory store. If the user doesn't exist, it returns a 404 error.
Asynchronous Job Processing: The "create user" job simulates a delay, mimicking a time-consuming task like sending a welcome email or processing additional data. Once the job runs, it adds the user to the in-memory store.
Running the Example
Make sure MongoDB is running locally.
Start your application with node app.js.
Use a tool like Postman or curl to test the command and query endpoints:
To create a user: POST http://localhost:3000/users with JSON body {"userId": "1", "userName": "John Doe"}.
To get a user: GET http://localhost:3000/users/1.
This example illustrates a basic CQRS pattern with asynchronous command processing using Express and Agenda.js.
It demonstrates how commands can be handled separately from queries, allowing for more scalable and responsive applications.
For this example, we'll design a simple CQRS-based application that schedules web scraping tasks using Playwright, tracks the status of these jobs, and retrieves their results.
This will involve creating a command to schedule a scraping job, and queries to check job status and get results. We'll use Express.js for the web server, Agenda.js for job queueing, and Playwright for web scraping.
Initialize a new Node.js project:
mkdir cqrs-scraping
cd cqrs-scraping
npm init -y
npm install express agenda mongodb body-parser playwright
Set up Express and Agenda.js (server.js):
const express = require('express');
const bodyParser = require('body-parser');
const { MongoClient } = require('mongodb');
const Agenda = require('agenda');
const { chromium } = require('playwright');
const app = express();
app.use(bodyParser.json());
const mongoConnectionString = 'mongodb://127.0.0.1/agenda';
const agenda = new Agenda({ db: { address: mongoConnectionString } });
const jobsResult = {}; // Store job results keyed by job ID
// Define a job for web scraping
agenda.define('web scraping', async (job) => {
const { url } = job.attrs.data;
const browser = await chromium.launch();
const page = await browser.newPage();
await page.goto(url);
const content = await page.content(); // Simplified scraping logic
await browser.close();
// Store result with job ID for retrieval
jobsResult[job.attrs._id] = content;
console.log(`Scraping completed for job ${job.attrs._id}`);
});
(async function() {
await agenda.start();
console.log('Agenda started');
})();
// Endpoint to schedule web scraping
app.post('/scrape', async (req, res) => {
const { url } = req.body;
const job = await agenda.now('web scraping', { url });
res.send({ message: 'Scraping job scheduled', jobId: job.attrs._id });
});
// Endpoint to check job status
app.get('/status/:jobId', (req, res) => {
const { jobId } = req.params;
if (jobsResult[jobId]) {
res.send({ status: 'Completed' });
} else {
res.send({ status: 'In Progress' });
}
});
// Endpoint to get job result
app.get('/result/:jobId', (req, res) => {
const { jobId } = req.params;
const result = jobsResult[jobId];
if (result) {
res.send({ result });
} else {
res.status(404).send({ message: 'Result not found' });
}
});
const port = 3000;
app.listen(port, () => console.log(`Server running on port ${port}`));
Start MongoDB locally if it's not running already.
Run the server script:
node server.js
Schedule a web scraping job by sending a POST request to /scrape with a JSON body containing the URL to scrape.
Check the job status by sending a GET request to /status/:jobId using the job ID returned from the previous step.
Retrieve the job result by sending a GET request to /result/:jobId once the job is completed.
Job queues and the Command Query Responsibility Segregation (CQRS) pattern represent powerful architectural choices that can significantly enhance the scalability, performance, and maintainability of software systems, especially in complex, distributed environments like microservices.
When implemented thoughtfully, these patterns facilitate a high degree of decoupling between components, allowing for more granular scaling, improved fault tolerance, and greater flexibility in responding to changing requirements or workloads.
Join the other 2000+ savvy node.js developers who get article updates. You will receive only high-quality articles about Node.js, Cloud Computing and Javascript front-end frameworks.