From ac446e48e5e8d3ca19bee1456a6ecbf10454859f Mon Sep 17 00:00:00 2001 From: "davis.jaunzems" Date: Mon, 14 Jan 2019 11:56:58 +0100 Subject: [PATCH 1/4] fix(pool): added optional setting for killing processes and not retaining them --- REFERENCE.md | 1 + lib/process/child-pool.js | 15 +++++++++++++-- lib/queue.js | 15 ++++++++++++++- test/test_child-pool.js | 16 ++++++++++++++++ 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/REFERENCE.md b/REFERENCE.md index a033dc459..e57adac50 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -96,6 +96,7 @@ interface AdvancedSettings { retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). + keepProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish } ``` diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 122a03f0b..5b21c9e97 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -10,10 +10,17 @@ var ChildPool = function ChildPool() { return new ChildPool(); } + // Flag for keeping or killing processes after finished processing, default keep always + this.keepProcesses = true; + this.retained = {}; this.free = {}; }; +ChildPool.prototype.setKeepProcesses = function(keepProcesses) { + this.keepProcesses = keepProcesses; +}; + ChildPool.prototype.retain = function(processFile) { var _this = this; var child = _this.getFree(processFile).pop(); @@ -42,8 +49,12 @@ ChildPool.prototype.retain = function(processFile) { }; ChildPool.prototype.release = function(child) { - delete this.retained[child.pid]; - this.getFree(child.processFile).push(child); + if (this.keepProcesses === true) { + delete this.retained[child.pid]; + this.getFree(child.processFile).push(child); + } else { + this.kill(child); + } }; ChildPool.prototype.remove = function(child) { diff --git a/lib/queue.js b/lib/queue.js index dad3772a8..a03223d50 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -229,7 +229,8 @@ var Queue = function Queue(name, url, opts) { guardInterval: 5000, retryProcessDelay: 5000, drainDelay: 5, - backoffStrategies: {} + backoffStrategies: {}, + keepProcesses: true }); this.settings.lockRenewTime = @@ -250,6 +251,13 @@ var Queue = function Queue(name, url, opts) { this.processJob = this.processJob.bind(this); this.getJobFromId = Job.fromId.bind(null, this); + // Check settings to see if processes will be kept after finishing processing, default set to true + this.keepProcesses = + this.settings.keepProcesses !== undefined && + this.settings.keepProcesses === false + ? false + : true; + var keys = {}; _.each( [ @@ -672,6 +680,10 @@ Queue.prototype.start = function(concurrency) { }); }; +Queue.prototype.setKeepProcesses = function(keepProcesses) { + this.keepProcesses = keepProcesses; +}; + Queue.prototype.setHandler = function(name, handler) { if (!handler) { throw new Error('Cannot set an undefined handler'); @@ -691,6 +703,7 @@ Queue.prototype.setHandler = function(name, handler) { } this.childPool = this.childPool || require('./process/child-pool')(); + this.childPool.setKeepProcesses(this.keepProcesses); var sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 5f54e8681..e78a1302c 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -142,4 +142,20 @@ describe('Child pool', function() { expect(children).to.include(child); }); }); + + it('should kill child after processing is finished and not retain it', function() { + var processor = __dirname + '/fixtures/fixture_processor_bar.js'; + var child; + + pool.setKeepProcesses(false); + + return pool.retain(processor).then(function(_child) { + expect(_child).to.be.ok; + child = _child; + pool.release(child); + + expect(pool.retained).to.be.empty; + expect(pool.free[processor]).to.be.empty; + }); + }); }); From ff7774a5be8f8c3301c902bc22fe22534c167ec2 Mon Sep 17 00:00:00 2001 From: "davis.jaunzems" Date: Fri, 18 Jan 2019 08:40:03 +0100 Subject: [PATCH 2/4] fix(test): removed unnecessary variable reassigning --- test/test_child-pool.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/test_child-pool.js b/test/test_child-pool.js index e78a1302c..3f6b1e339 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -145,14 +145,12 @@ describe('Child pool', function() { it('should kill child after processing is finished and not retain it', function() { var processor = __dirname + '/fixtures/fixture_processor_bar.js'; - var child; pool.setKeepProcesses(false); return pool.retain(processor).then(function(_child) { expect(_child).to.be.ok; - child = _child; - pool.release(child); + pool.release(_child); expect(pool.retained).to.be.empty; expect(pool.free[processor]).to.be.empty; From a8d3cd0213e35bf346ecbe86dcd35adb60cac4b2 Mon Sep 17 00:00:00 2001 From: "davis.jaunzems" Date: Mon, 21 Jan 2019 09:23:57 +0100 Subject: [PATCH 3/4] rendamed functions and variables, removed redundant code --- REFERENCE.md | 2 +- lib/process/child-pool.js | 8 ++++---- lib/queue.js | 15 ++------------- test/test_child-pool.js | 2 +- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/REFERENCE.md b/REFERENCE.md index e57adac50..df1ff07cc 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -96,7 +96,7 @@ interface AdvancedSettings { retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). - keepProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish + reuseProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish } ``` diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 5b21c9e97..18d96ab1f 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -11,14 +11,14 @@ var ChildPool = function ChildPool() { } // Flag for keeping or killing processes after finished processing, default keep always - this.keepProcesses = true; + this.reuseProcesses = true; this.retained = {}; this.free = {}; }; -ChildPool.prototype.setKeepProcesses = function(keepProcesses) { - this.keepProcesses = keepProcesses; +ChildPool.prototype.setReuseProcesses = function(reuseProcesses) { + this.reuseProcesses = reuseProcesses; }; ChildPool.prototype.retain = function(processFile) { @@ -49,7 +49,7 @@ ChildPool.prototype.retain = function(processFile) { }; ChildPool.prototype.release = function(child) { - if (this.keepProcesses === true) { + if (this.reuseProcesses === true) { delete this.retained[child.pid]; this.getFree(child.processFile).push(child); } else { diff --git a/lib/queue.js b/lib/queue.js index a03223d50..c028b3f35 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -230,7 +230,7 @@ var Queue = function Queue(name, url, opts) { retryProcessDelay: 5000, drainDelay: 5, backoffStrategies: {}, - keepProcesses: true + reuseProcesses: true }); this.settings.lockRenewTime = @@ -251,13 +251,6 @@ var Queue = function Queue(name, url, opts) { this.processJob = this.processJob.bind(this); this.getJobFromId = Job.fromId.bind(null, this); - // Check settings to see if processes will be kept after finishing processing, default set to true - this.keepProcesses = - this.settings.keepProcesses !== undefined && - this.settings.keepProcesses === false - ? false - : true; - var keys = {}; _.each( [ @@ -680,10 +673,6 @@ Queue.prototype.start = function(concurrency) { }); }; -Queue.prototype.setKeepProcesses = function(keepProcesses) { - this.keepProcesses = keepProcesses; -}; - Queue.prototype.setHandler = function(name, handler) { if (!handler) { throw new Error('Cannot set an undefined handler'); @@ -703,7 +692,7 @@ Queue.prototype.setHandler = function(name, handler) { } this.childPool = this.childPool || require('./process/child-pool')(); - this.childPool.setKeepProcesses(this.keepProcesses); + this.childPool.setReuseProcesses(this.settings.reuseProcesses); var sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 3f6b1e339..defaed9b8 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -146,7 +146,7 @@ describe('Child pool', function() { it('should kill child after processing is finished and not retain it', function() { var processor = __dirname + '/fixtures/fixture_processor_bar.js'; - pool.setKeepProcesses(false); + pool.setReuseProcesses(false); return pool.retain(processor).then(function(_child) { expect(_child).to.be.ok; From 2b5bfdc46831f5659f242ca48a93e79cb14ca11b Mon Sep 17 00:00:00 2001 From: "davis.jaunzems" Date: Thu, 7 Feb 2019 09:03:51 +0100 Subject: [PATCH 4/4] updated test code --- test/test_child-pool.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_child-pool.js b/test/test_child-pool.js index defaed9b8..bc6416823 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -144,11 +144,11 @@ describe('Child pool', function() { }); it('should kill child after processing is finished and not retain it', function() { - var processor = __dirname + '/fixtures/fixture_processor_bar.js'; + const processor = __dirname + '/fixtures/fixture_processor_bar.js'; pool.setReuseProcesses(false); - return pool.retain(processor).then(function(_child) { + return pool.retain(processor).then(_child => { expect(_child).to.be.ok; pool.release(_child);