Skip to content

Commit

Permalink
Upgrade reqless-core to include default pattern with queue priority p…
Browse files Browse the repository at this point in the history
…atterns
  • Loading branch information
tdg5 committed Aug 29, 2024
1 parent afecea3 commit acf275f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 8 deletions.
21 changes: 20 additions & 1 deletion reqless/lua/reqless-lib.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Current SHA: 8b6600adb988e7f4922f606798b6ad64c06a245d
-- Current SHA: 54efb0679992da71b576cf16c043e9c9f985f426
-- This is a generated file
-- cjson can't tell an empty array from an empty object, so empty arrays end up
-- encoded as objects. This function makes empty arrays look like empty arrays.
Expand Down Expand Up @@ -2527,6 +2527,7 @@ function ReqlessQueue.counts(now, name)
end
local ReqlessQueuePatterns = {
default_identifiers_default_pattern = '["*"]',
default_priority_pattern = '{"fairly": false, "pattern": ["default"]}',
ns = Reqless.ns .. "qp:",
}
ReqlessQueuePatterns.__index = ReqlessQueuePatterns
Expand Down Expand Up @@ -2599,6 +2600,10 @@ ReqlessQueuePatterns['getPriorityPatterns'] = function(now)
reply = redis.call('lrange', 'qmore:priority', 0, -1)
end

if #reply == 0 then
reply = {ReqlessQueuePatterns.default_priority_pattern}
end

return reply
end

Expand All @@ -2610,7 +2615,21 @@ ReqlessQueuePatterns['setPriorityPatterns'] = function(now, ...)
redis.call('del', key)
-- Clear out the legacy key
redis.call('del', 'qmore:priority')

if #arg > 0 then
-- Check for the default priority pattern and add one if none is given.
local found_default = false
for i = 1, #arg do
local pattern = cjson.decode(arg[i])['pattern']
if #pattern == 1 and pattern[1] == 'default' then
found_default = true
break
end
end
if not found_default then
table.insert(arg, ReqlessQueuePatterns.default_priority_pattern)
end

redis.call('rpush', key, unpack(arg))
end
end
Expand Down
20 changes: 19 additions & 1 deletion reqless/lua/reqless.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Current SHA: 8b6600adb988e7f4922f606798b6ad64c06a245d
-- Current SHA: 54efb0679992da71b576cf16c043e9c9f985f426
-- This is a generated file
local function cjsonArrayDegenerationWorkaround(array)
if #array == 0 then
Expand Down Expand Up @@ -1898,6 +1898,7 @@ function ReqlessQueue.counts(now, name)
end
local ReqlessQueuePatterns = {
default_identifiers_default_pattern = '["*"]',
default_priority_pattern = '{"fairly": false, "pattern": ["default"]}',
ns = Reqless.ns .. "qp:",
}
ReqlessQueuePatterns.__index = ReqlessQueuePatterns
Expand Down Expand Up @@ -1961,14 +1962,31 @@ ReqlessQueuePatterns['getPriorityPatterns'] = function(now)
reply = redis.call('lrange', 'qmore:priority', 0, -1)
end

if #reply == 0 then
reply = {ReqlessQueuePatterns.default_priority_pattern}
end

return reply
end

ReqlessQueuePatterns['setPriorityPatterns'] = function(now, ...)
local key = ReqlessQueuePatterns.ns .. 'priorities'
redis.call('del', key)
redis.call('del', 'qmore:priority')

if #arg > 0 then
local found_default = false
for i = 1, #arg do
local pattern = cjson.decode(arg[i])['pattern']
if #pattern == 1 and pattern[1] == 'default' then
found_default = true
break
end
end
if not found_default then
table.insert(arg, ReqlessQueuePatterns.default_priority_pattern)
end

redis.call('rpush', key, unpack(arg))
end
end
Expand Down
9 changes: 8 additions & 1 deletion reqless/models/queue_priority_pattern.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import Any, List


class QueuePriorityPattern:
Expand All @@ -10,6 +10,13 @@ def __init__(
self.patterns: List[str] = patterns
self.should_distribute_fairly: bool = should_distribute_fairly

def __eq__(self, other: Any) -> bool:
return (
isinstance(other, QueuePriorityPattern)
and other.patterns == self.patterns
and other.should_distribute_fairly == self.should_distribute_fairly
)

def __repr__(self) -> str:
return (
f"<QueuePriorityPattern patterns={self.patterns}"
Expand Down
2 changes: 1 addition & 1 deletion reqless/reqless-core
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@


class TestDynamicPriorityQueueIdentifiersTransformer(TestReqless):
default_queue_priority_patterns = [
QueuePriorityPattern(patterns=["default"], should_distribute_fairly=False),
]

def setUp(self) -> None:
TestReqless.setUp(self)
# We use many queues to reduce the chances that we shuffle a list into
Expand Down Expand Up @@ -279,24 +283,24 @@ def test_get_dynamic_queue_priorities_caches_priorities_for_some_duration(
)
self.client.queue_patterns.set_queue_priority_patterns([])
priorities = subject._get_dynamic_queue_priorities()
self.assertEqual([], priorities)
self.assertEqual(priorities, self.default_queue_priority_patterns)

self.client.queue_patterns.set_queue_priority_patterns(
[
QueuePriorityPattern(
patterns=["default"], should_distribute_fairly=False
patterns=["default"], should_distribute_fairly=True
),
]
)
priorities = subject._get_dynamic_queue_priorities()
self.assertEqual([], priorities)
self.assertEqual(self.default_queue_priority_patterns, priorities)

time.sleep(refresh_frequency / 1000.0)

priorities = subject._get_dynamic_queue_priorities()
self.assertEqual(1, len(priorities))
self.assertEqual(["default"], priorities[0].patterns)
self.assertFalse(priorities[0].should_distribute_fairly)
self.assertTrue(priorities[0].should_distribute_fairly)

def test_with_transforming_queue_resolver(self) -> None:
"""It works with TransformingQueueResolver"""
Expand Down

0 comments on commit acf275f

Please sign in to comment.