Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(queue-keys): reuse getQueueQualifiedName method #2109

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}):
self.parent = {"id": parent.get("id"), "queueKey": parent.get("queue")} if parent else None
self.stacktrace: List[str] = []
self.scripts = Scripts(queue.prefix, queue.name, queue.redisConnection)
self.queueQualifiedName = queue.qualifiedName

def updateData(self, data):
self.data = data
Expand Down Expand Up @@ -152,10 +153,6 @@ async def saveStacktrace(self, pipe, err:str):
def moveToWaitingChildren(self, token, opts:dict):
return self.scripts.moveToWaitingChildren(self.id, token, opts)

@property
def queueQualifiedName(self):
return f"{self.queue.prefix}:{self.queue.name}"

@staticmethod
def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):
"""
Expand Down
11 changes: 8 additions & 3 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def __init__(self, name: str, redisOpts: dict | str = {}, opts: QueueOptions = {
self.prefix = opts.get("prefix", "bull")
self.scripts = Scripts(
self.prefix, name, self.redisConnection)
self.keys = self.scripts.queue_keys.getKeys(name)
self.qualifiedName = self.scripts.queue_keys.getQueueQualifiedName(name)

def toKey(self, type: str):
return self.scripts.queue_keys.toKey(self.name, type)

async def add(self, name: str, data, opts: JobOptions = {}):
"""
Expand Down Expand Up @@ -63,7 +68,7 @@ async def isPaused(self):
"""
Returns true if the queue is currently paused.
"""
paused_key_exists = await self.client.hexists(f"{self.prefix}:{self.name}:meta", "paused")
paused_key_exists = await self.client.hexists(self.keys["meta"], "paused")
return paused_key_exists == 1

async def obliterate(self, force: bool = False):
Expand Down Expand Up @@ -103,13 +108,13 @@ def trimEvents(self, maxLength: int):

@param maxLength:
"""
return self.client.xtrim(f"{self.prefix}:{self.name}:events", maxlen = maxLength, approximate = "~")
return self.client.xtrim(self.keys["events"], maxlen = maxLength, approximate = "~")

def removeDeprecatedPriorityKey(self):
"""
Delete old priority helper key.
"""
return self.client.delete(f"{self.prefix}:{self.name}:priority")
return self.client.delete(self.toKey("priority"))

async def getJobCountByTypes(self, *types):
result = await self.getJobCounts(*types)
Expand Down
22 changes: 22 additions & 0 deletions python/bullmq/queue_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class QueueKeys:
"""
This class handles all keys parser logic.
"""

def __init__(self, prefix: str = 'bull'):
self.prefix = prefix

def getKeys(self, name: str):
names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events"]
keys = {}
for name_type in names:
keys[name_type] = self.toKey(name, name_type)

return keys

def toKey(self, name: str, name_type: str):
return f"{self.getQueueQualifiedName(name)}:{name_type}"

def getQueueQualifiedName(self, name: str):
return f"{self.prefix}:{name}"
10 changes: 4 additions & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations
from redis import Redis
from bullmq.queue_keys import QueueKeys
from bullmq.error_code import ErrorCode
from bullmq.utils import isRedisVersionLowerThan, get_parent_key
from typing import Any, TYPE_CHECKING
Expand Down Expand Up @@ -53,14 +54,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-2.lua")),
}

# loop all the names and add them to the keys object
names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events", "waiting-children"]
for name in names:
self.keys[name] = self.toKey(name)
self.queue_keys = QueueKeys(prefix)
self.keys = self.queue_keys.getKeys(queueName)

def toKey(self, name: str):
return f"{self.prefix}:{self.queueName}:{name}"
return self.queue_keys.toKey(self.queueName, name)

def getScript(self, name: str):
"""
Expand Down
1 change: 1 addition & 0 deletions python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], o
self.blockUntil = 0
self.limitUntil = 0
self.drained = False
self.qualifiedName = self.scripts.queue_keys.getQueueQualifiedName(name)

if opts.get("autorun", True):
asyncio.ensure_future(self.run())
Expand Down
3 changes: 2 additions & 1 deletion src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ export class FlowProducer extends EventEmitter {
parent: {
parentOpts: {
id: parentId,
queue: queueKeysParent.getPrefixedQueueName(node.queueName),
queue: queueKeysParent.getQueueQualifiedName(node.queueName),
},
parentDependenciesKey,
},
Expand Down Expand Up @@ -433,6 +433,7 @@ export class FlowProducer extends EventEmitter {
keys: queueKeys.getKeys(node.queueName),
toKey: (type: string) => queueKeys.toKey(node.queueName, type),
opts: { prefix },
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),
closing: this.closing,
waitUntilReady: async () => this.connection.client,
removeListener: this.removeListener.bind(this) as any,
Expand Down
16 changes: 8 additions & 8 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ export class Job<
NameType extends string = string,
> implements MinimalJob<DataType, ReturnType, NameType>
{
/**
* It includes the prefix, the namespace separator :, and queue name.
* @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html
*/
public readonly queueQualifiedName: string;

/**
* The progress a job has performed so far.
* @defaultValue 0
Expand Down Expand Up @@ -183,6 +189,8 @@ export class Job<

this.toKey = queue.toKey.bind(queue);
this.scripts = new Scripts(queue);

this.queueQualifiedName = queue.qualifiedName;
}

/**
Expand Down Expand Up @@ -715,14 +723,6 @@ export class Job<
return this.queue.opts.prefix;
}

/**
* @returns it includes the prefix, the namespace separator :, and queue name.
* @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html
*/
get queueQualifiedName(): string {
return `${this.prefix}:${this.queueName}`;
}

/**
* Get current state.
*
Expand Down
2 changes: 2 additions & 0 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {

protected scripts: Scripts;
protected connection: RedisConnection;
public readonly qualifiedName: string;

/**
*
Expand Down Expand Up @@ -66,6 +67,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
});

const queueKeys = new QueueKeys(opts.prefix);
this.qualifiedName = queueKeys.getQueueQualifiedName(name);
this.keys = queueKeys.getKeys(name);
this.toKey = (type: string) => queueKeys.toKey(name, type);
this.scripts = new Scripts(this);
Expand Down
10 changes: 3 additions & 7 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ export class QueueKeys {
'',
'active',
'wait',
'waiting',
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some names were removed as we don't use or save those key names

'waiting-children',
'paused',
'resumed',
'id',
'delayed',
'prioritized',
Expand All @@ -21,11 +20,8 @@ export class QueueKeys {
'stalled',
'repeat',
'limiter',
'drained',
'progress',
'meta',
'events',
'delay',
'pc',
].forEach(key => {
keys[key] = this.toKey(name, key);
Expand All @@ -35,10 +31,10 @@ export class QueueKeys {
}

toKey(name: string, type: string): string {
return `${this.getPrefixedQueueName(name)}:${type}`;
return `${this.getQueueQualifiedName(name)}:${type}`;
}

getPrefixedQueueName(name: string): string {
getQueueQualifiedName(name: string): string {
return `${this.prefix}:${name}`;
}
}
1 change: 1 addition & 0 deletions src/types/minimal-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type MinimalQueue = Pick<
| 'toKey'
| 'keys'
| 'opts'
| 'qualifiedName'
| 'closing'
| 'waitUntilReady'
| 'removeListener'
Expand Down