pm2部署nodejs项目时任务重复执行
在使用PM2部署Node.js项目时,如果出现任务重复执行的问题,通常是因为多个进程实例(workers)同时运行导致的。解决方案的核心是限制某些任务只在一个进程中执行。详细探讨几种解决此问题的方法,并提供相应的代码示例。
1. 使用`cluster_mode`模式并限制任务执行
PM2支持cluster_mode
模式,可以自动启动多个进程来提高性能。但默认情况下,所有进程都会执行相同的代码逻辑,这可能导致任务重复执行。
解决思路
可以通过检查当前进程的process.env.NODE_APP_INSTANCE
值,确保只有主进程或某个特定进程执行任务。
示例代码
javascript
const os = require('os');
const numCPUs = os.cpus().length;</p>
<p>if (process.env.NODE<em>APP</em>INSTANCE === '0') {
console.log('Only this instance will execute the task.');
// 在这里放置需要单次执行的任务
setInterval(() => {
console.log('This task runs only on one worker.');
}, 5000);
}</p>
<p>// 其他代码逻辑
console.log('All workers can execute this part.');
说明
process.env.NODE_APP_INSTANCE
是 PM2 提供的环境变量,用于标识当前进程的实例编号。- 在上面的代码中,我们仅允许
NODE_APP_INSTANCE
为0
的进程执行特定任务。
2. 使用Redis或数据库进行任务锁定
当多个进程同时运行时,可以通过外部存储(如Redis或数据库)实现分布式锁机制,确保任务只被一个进程执行。
解决思路
使用Redis的SETNX
命令创建一个键值对,只有个尝试设置该键的进程能够成功,其他进程会被阻塞或跳过任务。
示例代码
javascript const redis = require('redis'); const client = redis.createClient();</p> <p>function runTaskWithLock(taskName, taskCallback) { const lockKey = <code>task_lock:${taskName}
; const lockTTL = 60; // 锁的过期时间(秒)client.setnx(lockKey, Date.now(), (err, reply) => { if (reply === 1) { // 如果成功设置锁 client.expire(lockKey, lockTTL); // 设置锁的有效期 console.log(
Process ${process.pid} acquired the lock for task: ${taskName}
); taskCallback(); // 执行任务 setTimeout(() => { client.del(lockKey); // 任务完成后释放锁 console.log(Process ${process.pid} released the lock for task: ${taskName}
); }, 5000); // 假设任务执行时间为5秒 } else { console.log(Process ${process.pid} could not acquire the lock for task: ${taskName}
); } }); }// 调用示例 runTaskWithLock('my_task', () => { console.log('Executing task...'); });
说明
SETNX
命令确保只有一个进程能成功设置锁。- 锁的过期时间防止死锁(例如进程崩溃后未能释放锁)。
3. 单独启动一个进程处理特定任务
如果某些任务不适合与其他进程共享,可以单独启动一个进程专门处理这些任务。
解决思路
通过PM2配置文件,将任务分离到不同的进程组中。
示例代码
配置文件ecosystem.config.js
javascript
module.exports = {
apps: [
{
name: 'main-app',
script: 'app.js',
instances: 'max', // 启动多个进程
exec_mode: 'cluster',
},
{
name: 'task-handler',
script: 'taskHandler.js',
instances: 1, // 只启动一个进程
exec_mode: 'fork',
},
],
};
主应用app.js
javascript
console.log('Main application running...');
任务处理器taskHandler.js
javascript
setInterval(() => {
console.log('This task runs in a dedicated process.');
}, 5000);
说明
instances: 'max'
表示根据CPU核心数启动多个进程。instances: 1
确保任务处理器只运行一个实例。
4. 使用消息队列解耦任务
对于复杂的任务调度需求,可以引入消息队列(如RabbitMQ、Kafka等),让任务由专门的消费者进程处理。
解决思路
主应用将任务发送到消息队列,消费者进程从队列中获取任务并执行。这样即使有多个生产者,任务也不会重复执行。
示例代码
发送任务到队列
javascript
const amqplib = require('amqplib');</p>
<p>async function sendTaskToQueue() {
const connection = await amqplib.connect('amqp://localhost');
const channel = await connection.createChannel();
const queueName = 'task_queue';</p>
<p>await channel.assertQueue(queueName, { durable: true });
const message = JSON.stringify({ task: 'example_task' });</p>
<p>channel.sendToQueue(queueName, Buffer.from(message));
console.log('Task sent to queue.');
}</p>
<p>sendTaskToQueue();
消费任务
javascript
const amqplib = require('amqplib');</p>
<p>async function consumeTasks() {
const connection = await amqplib.connect('amqp://localhost');
const channel = await connection.createChannel();
const queueName = 'task_queue';</p>
<p>await channel.assertQueue(queueName, { durable: true });
console.log('Waiting for tasks.');</p>
<p>channel.consume(queueName, (msg) => {
if (msg !== null) {
const task = JSON.parse(msg.content.toString());
console.log('Received task:', task);
channel.ack(msg); // 确认任务已处理
}
});
}</p>
<p>consumeTasks();
说明
- 消息队保任务只会被一个消费者处理。
- 这种方式适合高并发场景下的任务分发。
在PM2部署Node.js项目时,任务重复执行的问题可以通过以下几种方法解决:
1. 限制任务在特定进程中执行:利用process.env.NODE_APP_INSTANCE
。
2. 使用Redis或数据库实现分布式锁:确保任务只被一个进程执行。
3. 分离任务到独立进程:通过PM2配置文件启动专用任务处理器。
4. 引入消息队列:解耦任务生产和消费,避免重复执行。
根据实际需求选择合适的方案,可以有效避免任务重复执行的问题。