Colas de trabajos & CQRS - El patron que necesitas para escalar a millones de request / minuto

Santiago Quinteros - CEO & CTO - Software on the road
By:
Santiago Quinteros

Colas de Trabajos

Una cola de trabajos es un mecanismo para administrar y procesar tareas o "trabajos" de manera asíncrona. Permite a las aplicaciones delegar tareas que no son inmediatamente necesarias para el flujo de trabajo principal a un servicio o proceso separado, el cual maneja estas tareas en segundo plano.

Este enfoque es beneficioso para tareas que son intensivas en recursos, consumen mucho tiempo o no son críticas para la respuesta inmediata del sistema, como enviar correos electrónicos, generar informes o realizar procesamiento de datos por lotes.

Cómo Funciona

  1. Creación de Trabajos: La aplicación crea un trabajo para realizar una tarea específica. Este trabajo se añade a la cola, esencialmente una lista de tareas esperando ser procesadas.

  2. Gestión de la Cola: La cola de trabajos mantiene los trabajos hasta que puedan ser procesados. Los trabajos pueden ser priorizados, retrasados o programados para ejecución futura basados en las necesidades de la aplicación.

  3. Procesos Workeres: Procesos separados o "workeres" monitorean continuamente la cola de trabajos en busca de nuevos trabajos para procesar. Una vez que un worker elige un trabajo de la cola, ejecuta la tarea asociada con ese trabajo.

  4. Procesamiento de Trabajos: El worker procesa el trabajo según la tarea especificada. Este procesamiento ocurre de manera asíncrona, significando que la aplicación puede continuar operando y respondiendo a otras solicitudes mientras el trabajo está siendo manejado en segundo plano.

  5. Completitud y Callbacks: Una vez que un trabajo se completa, el worker puede notificar a la aplicación de su finalización, actualizar su estado y, opcionalmente, activar callbacks o acciones de seguimiento.

Beneficios de Usar una Cola de Trabajos

  1. Mejora del Rendimiento de la Aplicación: Al delegar tareas pesadas o que consumen mucho tiempo a una cola de trabajos, el hilo principal de la aplicación permanece libre para manejar solicitudes entrantes, mejorando el rendimiento general y la capacidad de respuesta.

  2. Escalabilidad: Las colas de trabajos pueden distribuir tareas a través de múltiples workeres o servidores, facilitando la escalabilidad horizontal de la aplicación a medida que aumenta la carga de trabajo.

  3. Fiabilidad: Las colas de trabajos pueden implementar características como mecanismos de reintento para trabajos fallidos, asegurando que todas las tareas sean eventualmente procesadas incluso durante fallos temporales.

  4. Flexibilidad: Los desarrolladores pueden programar trabajos para ejecutarse en momentos específicos, gestionar la prioridad de las tareas y ajustar el número de workeres basados en la carga, ofreciendo un mayor control sobre cómo y cuándo se procesan los trabajos.

Worker

Un worker es un proceso o hilo dedicado cuyo propósito principal es monitorear una cola de trabajos y ejecutar las tareas o trabajos que encuentra.

Este concepto es central en arquitecturas de procesamiento asíncrono, donde tareas específicas son delegadas del flujo principal de la aplicación para ser procesadas en segundo plano, mejorando la eficiencia y la experiencia del usuario. Los workeres son la columna vertebral de este sistema, asegurando que las tareas se ejecuten de manera oportuna y ordenada.

Características Clave de un Worker

Operación Autónoma: Los workeres funcionan independientemente del proceso principal de la aplicación, a menudo en hilos separados o en diferentes servidores. Esta separación permite que la aplicación principal siga siendo receptiva a las solicitudes de los usuarios mientras los workeres manejan tareas intensivas en recursos o que consumen mucho tiempo.

Sondeo Continuo: Los workeres monitorean continuamente la cola de trabajos en busca de nuevas tareas. Una vez identificada una tarea, un worker la toma, la marca como en proceso y comienza la ejecución de la tarea asociada.

Ejecución de Tareas: Los workeres son responsables de ejecutar las tareas que toman de la cola. Estas tareas pueden variar desde enviar correos electrónicos, procesar archivos y generar informes, hasta cualquier otra operación asíncrona que se pueda realizar independientemente del flujo de trabajo principal de la aplicación.

Manejo de Errores y Reintentos:

Los workeres a menudo tienen mecanismos integrados para manejar errores o fallos. Si una tarea falla, un worker puede reintentar la tarea según reglas o políticas especificadas, asegurando que problemas temporales no lleven al fallo de la tarea.

Escalabilidad: Varios workeres pueden operar concurrentemente, permitiendo al sistema escalar agregando más workeres basados en el volumen de tareas en la cola. Esta escalabilidad asegura que el sistema pueda manejar cargas de trabajo crecientes de manera eficiente.

Cómo los Workeres Encajan en la Arquitectura del Sistema

En una arquitectura de cola de trabajos típica, los workeres juegan un papel crucial en equilibrar la carga y asegurar el funcionamiento suave del sistema.

Aquí está cómo encajan en la arquitectura general:

  1. Delegación de Tareas: La aplicación principal delega tareas a una cola de trabajos en lugar de ejecutarlas de manera síncrona, asegurando que la aplicación permanezca receptiva.

  2. Monitoreo: Los workeres monitorean constantemente esta cola, esperando que se añadan nuevas tareas.

  3. Procesamiento: Cuando un worker encuentra una nueva tarea en la cola, la recupera, la procesa y, una vez completada, la elimina de la cola o la marca como completada.

  4. Bucle de Retroalimentación: Después de procesar una tarea, los workeres pueden actualizar el sistema o la aplicación sobre el estado de la tarea, facilitando un bucle de retroalimentación donde el resultado de los procesos en segundo plano puede influir en el estado de la aplicación o la experiencia del usuario.

Los workeres están diseñados para ser robustos y resilientes, capaces de manejar con gracia fallos del sistema, problemas de red y otros errores imprevistos.

Esta resiliencia y la capacidad de escalar agregando más workeres hacen del patrón de worker y cola de trabajos una herramienta poderosa para construir aplicaciones escalables, eficientes y receptivas.

Implementación en node.js usando MongoDB y Agenda.js

Paso 1: Configurar MongoDB

Antes de usar Agenda.js, necesitas una instancia de MongoDB ya que Agenda usa MongoDB para almacenar datos de trabajos. Si aún no tienes MongoDB instalado y funcionando, debes configurarlo. Puedes instalar MongoDB localmente o usar una solución basada en la nube como MongoDB Atlas.

  1. Instalación Local de MongoDB: Sigue la guía de instalación de MongoDB para tu sistema operativo en la documentación oficial de MongoDB.

  2. MongoDB Basado en la Nube: Regístrate en MongoDB Atlas y crea un clúster. Atlas ofrece un nivel gratuito para propósitos de desarrollo.

Una vez que tu instancia de MongoDB esté lista, anota tu cadena de conexión. La necesitarás para conectar Agenda.js a tu base de datos.

Paso 2: Iniciar Tu Proyecto Node.js

Si aún no lo has hecho, crea un nuevo proyecto Node.js e inicialízalo:

mkdir mi-aplicacion-agenda
cd mi-aplicacion-agenda
npm init -y

Paso 3: Instalar Agenda.js

Instala Agenda.js y el controlador de MongoDB ejecutando:

npm install agenda mongodb

Paso 4: Configurar Agenda.js

Ahora, configurarás Agenda.js en tu aplicación Node.js. Crea un archivo llamado agendaSetup.js e inicializa Agenda con tu conexión a MongoDB:

const Agenda = require('agenda');

const connectionOpts = {
  db: { address: 'mongodb://localhost:27017/agendaDb', collection: 'trabajos' },
  processEvery: '30 segundos'
};

const agenda = new Agenda(connectionOpts);

module.exports = agenda;

Si estás usando una base de datos o host diferente, reemplaza 'mongodb://localhost:27017/agendaDb' con tu cadena de conexión a MongoDB.

Paso 5: Definir Trabajos

Con Agenda, defines trabajos especificando un nombre y una función que se llama cuando el trabajo se ejecuta. En el mismo o un archivo diferente, define un trabajo así:

const agenda = require('./agendaSetup');

agenda.define('say-hello', async trabajo => {
  console.log('¡Hola, Mundo!');
});

Paso 6: Programar Trabajos

Para programar trabajos, necesitas iniciar la agenda y luego programar tus

trabajos definidos según tus necesidades. Puedes hacer esto en un archivo app.js o al final de tu archivo agendaSetup.js:

(async function() { // IIFE para usar async/await
  await agenda.start();

  await agenda.every('1 hour', 'say-hello');
  
  console.log('Trabajo programado para say-hello cada hora.');
})();

Paso 7: Ejecutar Tu Aplicación

Ejecuta tu aplicación usando Node.js:

node app.js

Caso de uso avanzado - Enviar un correo de bienvenida después del registro de un usuario

En tu endpoint de registro simplemente llama a agenda y programa un trabajo

app.post('/registro', async (req, res) => {
  const { email } = req.body;
  
  // Aquí agregarías lógica para guardar al usuario en tu base de datos

  // Programa el trabajo 'send-welcome-meial'
  await agenda.schedule('in 2 minutes', 'send-welcome-meial', { email });

  res.status(200).send('Usuario registrado exitosamente, correo de bienvenida programado.');
});

Y tener el trabajo definido

// Define el trabajo 'send-welcome-meial'
agenda.define('send-welcome-meial', async trabajo => {
  const { email } = trabajo.attrs.data;
  console.log(`Enviando correo de bienvenida a ${email}`);
  // Aquí integrarías con tu servicio de correo
});

El Patrón CQRS y Integración con Colas de Trabajos

La Segregación de Responsabilidad de Comando y Consulta (CQRS, por sus siglas en inglés) es un patrón arquitectónico de software que separa las operaciones de lectura de datos (consultas) de las operaciones de actualización de datos (comandos), permitiendo que ambas escalen de manera independiente y optimicen el rendimiento, la complejidad y la seguridad para cada tipo de operación.

Integrar colas de trabajos con el patrón CQRS puede potenciar su efectividad, particularmente en el lado de comandos de la arquitectura. Esta integración trae varios beneficios, mejorando la escalabilidad, fiabilidad y capacidad de respuesta del sistema.

Entendiendo CQRS

CQRS se basa en el principio de que los modelos utilizados para actualizar la información no tienen que ser los mismos que los utilizados para leer la información. Esta separación permite flexibilidad en el diseño del sistema y puede mejorar el rendimiento y la escalabilidad. El patrón encaja bien con arquitecturas impulsadas por eventos y diseño dirigido por dominios (DDD), donde puede proporcionar límites y responsabilidades claros dentro del sistema.

Beneficios de Integrar Colas de Trabajos con CQRS

  1. Mejora de la Escalabilidad: Al usar colas de trabajos para manejar comandos, puedes delegar la ejecución de estos comandos a workeres en segundo plano. Esto permite al sistema manejar un alto volumen de solicitudes de escritura de manera más eficiente al distribuir la carga a través de múltiples workeres y recursos, mejorando la escalabilidad del modelo de comando.

  2. Mejora del Rendimiento: Separar comandos y consultas permite que cada uno sea optimizado para roles específicos. Las colas de trabajos pueden optimizar aún más la ejecución de comandos asegurando que las operaciones de escritura no bloqueen las operaciones de lectura, mejorando así el rendimiento general de la aplicación.

  3. Mayor Fiabilidad y Tolerancia a Fallos: Las colas de trabajos pueden reintentar automáticamente comandos fallidos, mejorando la fiabilidad del sistema. Esto es particularmente importante para operaciones que no deben fallar, como transacciones financieras o actualizaciones críticas de datos. Usar colas de trabajos asegura que los comandos puedan ser reintentados o pospuestos hasta que puedan completarse.

  4. Procesamiento Asíncrono: Integrar colas de trabajos permite que los comandos sean procesados de manera asíncrona, mejorando significativamente la experiencia del usuario al hacer la UI más receptiva. Los usuarios pueden recibir retroalimentación inmediata de sus acciones, incluso si el comando subyacente se procesa en segundo plano.

  5. Compatibilidad con Event Sourcing: CQRS a menudo se complementa con Event Sourcing, donde los cambios en el estado de la aplicación se almacenan como una secuencia de eventos

. Las colas de trabajos pueden manejar eficientemente la generación y procesamiento de estos eventos, facilitando una arquitectura robusta impulsada por eventos.

Consideraciones de Implementación

  • Manejo de Comandos: En un sistema basado en CQRS integrado con colas de trabajos, los comandos se envían a la cola de trabajos en lugar de ejecutarse directamente. Esto desacopla la emisión del comando de su ejecución, permitiendo un procesamiento más flexible y escalable.

  • Consistencia: Aunque las colas de trabajos y CQRS pueden mejorar el rendimiento y la escalabilidad, también introducen consistencia eventual en el sistema. Esto significa que el sistema podría reflejar los resultados de un comando de manera inmediata. Diseñar tu sistema para manejar o mitigar los efectos de la consistencia eventual es crucial.

  • Manejo de Errores: Se deben implementar mecanismos de manejo de errores y reintentos robustos para gestionar comandos fallidos durante la ejecución. Esto asegura que el sistema pueda recuperarse de errores de manera elegante sin perder datos o corromper el estado de la aplicación.

Ejemplo de CQRS

Para demostrar un ejemplo reproducible mínimo de una arquitectura CQRS usando Express y Agenda.js, vamos a crear una aplicación simple. Esta aplicación tendrá un comando para "crear un usuario" y una consulta para "obtener detalles del usuario". El comando "crear un usuario" será procesado de manera asíncrona usando Agenda.js.

Configura Tu Proyecto

Inicializa un nuevo proyecto Node.js (si aún no lo has hecho):

mkdir ejemplo-cqrs-agenda
cd ejemplo-cqrs-agenda
npm init -y

Instala los paquetes necesarios:

npm install express agenda mongodb body-parser

Implementando el Lado de Comandos con Agenda.js

Configura Express y Agenda.js (app.js):

const express = require('express');
const bodyParser = require('body-parser');
const { MongoClient } = require('mongodb');
const Agenda = require('agenda');

const app = express();
const port = 3000;

app.use(bodyParser.json());

const connectionStringMongo = 'mongodb://127.0.0.1/agenda';

// Inicializa conexión a MongoDB y Agenda
const cliente = new MongoClient(connectionStringMongo);
const agenda = new Agenda({ db: { address: connectionStringMongo } });

// Espacio de almacenamiento temporal para datos de usuarios
const usuarios = {};

// Define un trabajo para crear un usuario en Agenda
agenda.define('crear usuario', async (trabajo) => {
 const { userId, userName } = trabajo.attrs.data;
 // Simula un retraso en la creación del usuario
 await new Promise(resolve => setTimeout(resolve, 1000));
 usuarios[userId] = { userId, userName };
 console.log(`Usuario creado: ${userName}`);
});

(async function() { // Función autoinvocada async para asegurar un inicio adecuado
 await cliente.connect();
 await agenda.start();
 console.log('Agenda iniciada');
})();

// API de Comando para crear un usuario
app.post('/usuarios', async (req, res) => {
 const { userId, userName } = req.body;
 await agenda.schedule('en 2 segundos', 'crear usuario', { userId, userName });
 res.send({ mensaje: `Creación de usuario programada para ${userName}` });
});

// API de Consulta para obtener un usuario
app.get('/usuarios/:userId', (req, res) => {
 const { userId } = req.params;
 const usuario = usuarios[userId];
 if (usuario) {
 res.send(usuario);
 } else {
 res.status(404).send({ mensaje: 'Usuario no encontrado' });
 }
});

app.listen(port, () => {
 console.log(`Aplicación de ejemplo escuchando en http://localhost:${port}`);
});

Explicación

  • Configuración de MongoDB y Agenda: Este ejemplo se conecta a MongoDB, inicializa Agenda con la conexión y define un trabajo para crear un usuario. El objeto usuarios actúa como un simple almacén en memoria.

  • Endpoint de Comando: El endpoint POST /usuarios recibe un userId y userName, programa un trabajo de "crear usuario" con Agenda y responde inmediatamente, reconociendo la programación.

  • Endpoint de Consulta: El endpoint GET /usuarios/:userId busca y devuelve los detalles del usuario del almacén en memoria. Si el usuario no existe, devuelve un error 404.

  • Procesamiento de Trabajo Asíncrono: El trabajo de "crear usuario" simula un retraso, imitando una tarea que consume tiempo como enviar un correo de bienvenida o procesar datos adicionales. Una

vez que se ejecuta el trabajo, añade al usuario al almacén en memoria. Ejecución del Ejemplo

Asegúrate de que MongoDB esté corriendo localmente.

Inicia tu aplicación con node app.js.

Usa una herramienta como Postman o curl para probar los endpoints de comando y consulta:

Para crear un usuario: POST http://localhost:3000/usuarios con cuerpo JSON {"userId": "1", "userName": "Juan Pérez"}.

Para obtener un usuario: GET http://localhost:3000/usuarios/1.

Este ejemplo ilustra un patrón CQRS básico con procesamiento de comando asíncrono usando Express y Agenda.js.

Demuestra cómo los comandos pueden manejarse separadamente de las consultas, permitiendo aplicaciones más escalables y responsivas.

Ejemplo Avanzado - CQRS para Web Scraping

Para este ejemplo, diseñaremos una aplicación simple basada en CQRS que programa tareas de scraping web usando Playwright, rastrea el estado de estos trabajos y recupera sus resultados.

Esto implicará crear un comando para programar un trabajo de scraping, y consultas para verificar el estado del trabajo y obtener resultados. Usaremos Express.js para el servidor web, Agenda.js para la cola de trabajos y Playwright para el scraping web.

Configuración

Inicializa un nuevo proyecto Node.js:


mkdir cqrs-scraping
cd cqrs-scraping
npm init -y

Instala los paquetes necesarios:

npm install express agenda mongodb body-parser playwright

Implementación

Configura Express y Agenda.js (server.js):


const express = require('express');
const bodyParser = require('body-parser');
const { MongoClient } = require('mongodb');
const Agenda = require('agenda');
const { chromium } = require('playwright');

const app = express();
app.use(bodyParser.json());

const connectionStringMongo = 'mongodb://127.0.0.1/agenda';
const agenda = new Agenda({ db: { address: connectionStringMongo } });

const resultadosTrabajos = {}; // Almacena resultados de trabajos con ID de trabajo como clave

// Define un trabajo para scraping web
agenda.define('scraping web', async (trabajo) => {
  const { url } = trabajo.attrs.data;
  const navegador = await chromium.launch();
  const página = await navegador.newPage();
  await página.goto(url);
  const contenido = await página.content(); // Lógica de scraping simplificada
  await navegador.close();

  // Almacena resultado con ID de trabajo para su recuperación
  resultadosTrabajos[trabajo.attrs._id] = contenido;
  console.log(`Scraping completado para trabajo ${trabajo.attrs._id}`);
});

(async function() {
  await agenda.start();
  console.log('Agenda iniciada');
})();

// Endpoint para programar scraping web
app.post('/scrape', async (req, res) => {
  const { url } = req.body;
  const trabajo = await agenda.now('scraping web', { url });
  res.send({ mensaje: 'Trabajo de scraping programado', jobId: trabajo.attrs._id });
});

// Endpoint para verificar estado del trabajo
app.get('/estado/:jobId', (req, res) => {
  const { jobId } = req.params;
  if (resultadosTrabajos[jobId]) {
    res.send({ estado: 'Completado' });
  } else {
    res.send({ estado: 'En Progreso' });
  }
});

// Endpoint para obtener resultado del trabajo
app.get('/resultado/:jobId', (req, res) => {
  const { jobId } = req.params;
  const resultado = resultadosTrabajos[jobId];
  if (resultado) {
    res.send({ resultado });
  } else {
    res.status(404).send({ mensaje: 'Resultado no encontrado' });
  }
});

const puerto = 3000;
app.listen(puerto, () => console.log(`Servidor corriendo en puerto ${puerto}`));

Cómo Funciona

  • Trabajo de Scraping Web: La función agenda.define define un trabajo para scraping web. Utiliza Playwright para navegar a la URL especificada y almacenar el contenido de la página en resultadosTrabajos, con la clave siendo el ID único del trabajo.

  • Endpoint de Programación (/scrape): Este endpoint programa un nuevo trabajo de scraping web para la URL dada y devuelve el ID del trabajo. Los clientes pueden usar este ID para verificar el estado del trabajo y recuperar resultados.

  • Endpoint de Verificación de Estado (/estado/:jobId): Los clientes pueden usar este endpoint para verificar si el trabajo de scraping ha sido completado.

  • Endpoint de Recuperación de Resultados (/resultado/:jobId): Una vez que un trabajo se completa, los clientes pueden recuperar el contenido raspado a través de este endpoint usando el ID del trabajo.

Ejecución del Ejemplo

Inicia MongoDB localmente si aún no está corriendo.

Ejecuta el script del servidor:

node server.js

Uso

Programa un trabajo de scraping web enviando una solicitud POST a /scrape con un cuerpo JSON conteniendo la URL para raspar.

Verifica el estado del trabajo enviando una solicitud GET a /estado/:jobId usando el ID del trabajo devuelto del paso anterior.

Recupera el resultado del trabajo enviando una solicitud GET a /resultado/:jobId una vez que el trabajo se haya completado.

Conclusión

Las colas de trabajos y el patrón de Segregación de Responsabilidad de Comando y Consulta (CQRS) representan opciones arquitectónicas poderosas que pueden mejorar significativamente la escalabilidad, rendimiento y mantenibilidad de sistemas de software, especialmente en entornos complejos y distribuidos como los microservicios.

Cuando se implementan de manera reflexiva, estos patrones facilitan un alto grado de desacoplamiento entre componentes, permitiendo una escalabilidad más granular, mejor tolerancia a fallos y mayor flexibilidad para responder a requisitos o cargas de trabajo cambiantes.

Get the latest articles in your inbox.

Join the other 2000+ savvy node.js developers who get article updates. You will receive only high-quality articles about Node.js, Cloud Computing and Javascript front-end frameworks.


santypk4

CEO at Softwareontheroad.com