diff --git a/REFERENCE.md b/REFERENCE.md index 8b4782b59..8b7426d6e 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -105,6 +105,7 @@ interface AdvancedSettings { retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). + reuseProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish } ``` diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 7f4cca52f..d3ff85a18 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -13,6 +13,9 @@ const ChildPool = function ChildPool() { return new ChildPool(); } + // Flag for keeping or killing processes after finished processing, default keep always + this.reuseProcesses = true; + this.retained = {}; this.free = {}; }; @@ -39,6 +42,10 @@ const convertExecArgv = function(execArgv) { }); }; +ChildPool.prototype.setReuseProcesses = function(reuseProcesses) { + this.reuseProcesses = reuseProcesses; +} + ChildPool.prototype.retain = function(processFile) { const _this = this; let child = _this.getFree(processFile).pop(); @@ -65,8 +72,12 @@ ChildPool.prototype.retain = function(processFile) { }; ChildPool.prototype.release = function(child) { - delete this.retained[child.pid]; - this.getFree(child.processFile).push(child); + if (this.reuseProcesses === true) { + delete this.retained[child.pid]; + this.getFree(child.processFile).push(child); + } else { + this.kill(child); + } }; ChildPool.prototype.remove = function(child) { diff --git a/lib/queue.js b/lib/queue.js index 33be0970e..c2b023ea9 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -211,7 +211,8 @@ const Queue = function Queue(name, url, opts) { guardInterval: 5000, retryProcessDelay: 5000, drainDelay: 5, - backoffStrategies: {} + backoffStrategies: {}, + reuseProcesses: true }); this.settings.lockRenewTime = @@ -650,6 +651,7 @@ Queue.prototype.setHandler = function(name, handler) { } this.childPool = this.childPool || require('./process/child-pool')(); + this.childPool.setReuseProcesses(this.settings.reuseProcesses); const sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 47880cf49..d3575289a 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -141,4 +141,18 @@ describe('Child pool', () => { expect(children).to.include(child); }); }); + + it('should kill child after processing is finished and not retain it', function() { + const processor = __dirname + '/fixtures/fixture_processor_bar.js'; + + pool.setReuseProcesses(false); + + return pool.retain(processor).then(_child => { + expect(_child).to.be.ok; + pool.release(_child); + + expect(pool.retained).to.be.empty; + expect(pool.free[processor]).to.be.empty; + }); + }); });