Skip to content

Commit

Permalink
Merge pull request #12237 from Budibase/fix/update-bull-queue-parameters
Browse files Browse the repository at this point in the history
Updating bull parameters - help queue stalling
  • Loading branch information
mike12345567 authored Nov 1, 2023
2 parents 0477ed6 + 5adea6c commit 0fe98f4
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 4 deletions.
1 change: 1 addition & 0 deletions packages/backend-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export * as timers from "./timers"
export { default as env } from "./environment"
export * as blacklist from "./blacklist"
export * as docUpdates from "./docUpdates"
export * from "./utils/Duration"
export { SearchParams } from "./db"
// Add context to tenancy for backwards compatibility
// only do this for external usages to prevent internal
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-core/src/queue/inMemoryQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class InMemoryQueue {
* @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull
*/
constructor(name: string, opts = null) {
constructor(name: string, opts?: any) {
this._name = name
this._opts = opts
this._messages = []
Expand Down
20 changes: 17 additions & 3 deletions packages/backend-core/src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ import env from "../environment"
import { getRedisOptions } from "../redis/utils"
import { JobQueue } from "./constants"
import InMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull"
import BullQueue, { QueueOptions } from "bull"
import { addListeners, StalledFn } from "./listeners"
import { Duration } from "../utils"
import * as timers from "../timers"
import * as Redis from "ioredis"

const CLEANUP_PERIOD_MS = 60 * 1000
// the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
// queue lock is refreshed every 30 seconds
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
// cleanup the queue every 60 seconds
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
let cleanupInterval: NodeJS.Timeout

Expand All @@ -21,7 +28,14 @@ export function createQueue<T>(
opts: { removeStalledCb?: StalledFn } = {}
): BullQueue.Queue<T> {
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions()
const queueConfig: any = redisProtocolUrl || { redis: redisOpts }
const queueConfig: QueueOptions = {
redis: redisProtocolUrl! || (redisOpts as Redis.RedisOptions),
settings: {
maxStalledCount: 0,
lockDuration: QUEUE_LOCK_MS,
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
},
}
let queue: any
if (!env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig)
Expand Down
49 changes: 49 additions & 0 deletions packages/backend-core/src/utils/Duration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export enum DurationType {
MILLISECONDS = "milliseconds",
SECONDS = "seconds",
MINUTES = "minutes",
HOURS = "hours",
DAYS = "days",
}

const conversion: Record<DurationType, number> = {
milliseconds: 1,
seconds: 1000,
minutes: 60 * 1000,
hours: 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000,
}

export class Duration {
static convert(from: DurationType, to: DurationType, duration: number) {
const milliseconds = duration * conversion[from]
return milliseconds / conversion[to]
}

static from(from: DurationType, duration: number) {
return {
to: (to: DurationType) => {
return Duration.convert(from, to, duration)
},
toMs: () => {
return Duration.convert(from, DurationType.MILLISECONDS, duration)
},
}
}

static fromSeconds(duration: number) {
return Duration.from(DurationType.SECONDS, duration)
}

static fromMinutes(duration: number) {
return Duration.from(DurationType.MINUTES, duration)
}

static fromHours(duration: number) {
return Duration.from(DurationType.HOURS, duration)
}

static fromDays(duration: number) {
return Duration.from(DurationType.DAYS, duration)
}
}
1 change: 1 addition & 0 deletions packages/backend-core/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./hashing"
export * from "./utils"
export * from "./stringUtils"
export * from "./Duration"
19 changes: 19 additions & 0 deletions packages/backend-core/src/utils/tests/Duration.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Duration, DurationType } from "../Duration"

describe("duration", () => {
it("should convert minutes to milliseconds", () => {
expect(Duration.fromMinutes(5).toMs()).toBe(300000)
})

it("should convert seconds to milliseconds", () => {
expect(Duration.fromSeconds(30).toMs()).toBe(30000)
})

it("should convert days to milliseconds", () => {
expect(Duration.fromDays(1).toMs()).toBe(86400000)
})

it("should convert minutes to days", () => {
expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1)
})
})

0 comments on commit 0fe98f4

Please sign in to comment.