Skip to content

Commit

Permalink
Check version of workers when they connect, warn if there is a mismat…
Browse files Browse the repository at this point in the history
…ch. Refuse 1.x workers to connect. Move __version__ definition to the top of __init__.py (before importing other things), so that submodules can access it.
  • Loading branch information
cyberw committed Jul 5, 2021
1 parent 2e83655 commit a0f8884
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 39 deletions.
3 changes: 2 additions & 1 deletion locust/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
__version__ = "2.0.0b0"

from gevent import monkey

monkey.patch_all()
Expand All @@ -13,7 +15,6 @@

events = Events()

__version__ = "2.0.0b0"
__all__ = (
"SequentialTaskSet",
"wait_time",
Expand Down
12 changes: 10 additions & 2 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from gevent.pool import Group

from . import User
from locust import __version__
from .dispatch import UsersDispatcher
from .distribution import weight_users
from .exception import RPCError
Expand Down Expand Up @@ -842,6 +843,13 @@ def client_listener(self):
self.connection_broken = False
msg.node_id = client_id
if msg.type == "client_ready":
if not msg.data:
logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.")
continue
elif msg.data != __version__:
logger.warning(
f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}"
)
worker_node_id = msg.node_id
self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)
logger.info(
Expand Down Expand Up @@ -959,7 +967,7 @@ def __init__(self, environment, master_host, master_port):
self.client = rpc.Client(master_host, master_port, self.client_id)
self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)
self.client.send(Message("client_ready", None, self.client_id))
self.client.send(Message("client_ready", __version__, self.client_id))
self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler)

# register listener for when all users have spawned, and report it to the master node
Expand Down Expand Up @@ -1092,7 +1100,7 @@ def worker(self):
# random delays inherent to distributed systems.
additional_wait = int(os.getenv("LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP", 0))
gevent.sleep((self.environment.stop_timeout or 0) + additional_wait)
self.client.send(Message("client_ready", None, self.client_id))
self.client.send(Message("client_ready", __version__, self.client_id))
self.worker_state = STATE_INIT
elif msg.type == "quit":
logger.info("Got quit message from master, shutting down...")
Expand Down
62 changes: 29 additions & 33 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
from gevent.queue import Queue

import locust
from locust import (
LoadTestShape,
constant,
runners,
)
from locust import LoadTestShape, constant, runners, __version__
from locust.env import Environment
from locust.exception import (
RPCError,
Expand Down Expand Up @@ -1449,14 +1445,14 @@ def get_runner(self, user_classes=None):
def test_worker_connect(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue(
"zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"
)
server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))
server.mocked_send(Message("client_ready", None, "zeh_fake_client3"))
server.mocked_send(Message("client_ready", None, "zeh_fake_client4"))
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client3"))
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client4"))
self.assertEqual(4, len(master.clients))

server.mocked_send(Message("quit", None, "zeh_fake_client3"))
Expand All @@ -1465,7 +1461,7 @@ def test_worker_connect(self):
def test_worker_stats_report_median(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))

master.stats.get("/", "GET").log(100, 23455)
master.stats.get("/", "GET").log(800, 23455)
Expand All @@ -1482,7 +1478,7 @@ def test_worker_stats_report_median(self):
def test_worker_stats_report_with_none_response_times(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))

master.stats.get("/mixed", "GET").log(0, 23455)
master.stats.get("/mixed", "GET").log(800, 23455)
Expand All @@ -1508,16 +1504,16 @@ def test_worker_stats_report_with_none_response_times(self):
def test_master_marks_downed_workers_as_missing(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))
sleep(6)
# print(master.clients['fake_client'].__dict__)
assert master.clients["fake_client"].state == STATE_MISSING

def test_last_worker_quitting_stops_test(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client1"))
server.mocked_send(Message("client_ready", None, "fake_client2"))
server.mocked_send(Message("client_ready", __version__, "fake_client1"))
server.mocked_send(Message("client_ready", __version__, "fake_client2"))

master.start(1, 2)
server.mocked_send(Message("spawning", None, "fake_client1"))
Expand All @@ -1537,9 +1533,9 @@ def test_last_worker_quitting_stops_test(self):
def test_last_worker_missing_stops_test(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client1"))
server.mocked_send(Message("client_ready", None, "fake_client2"))
server.mocked_send(Message("client_ready", None, "fake_client3"))
server.mocked_send(Message("client_ready", __version__, "fake_client1"))
server.mocked_send(Message("client_ready", __version__, "fake_client2"))
server.mocked_send(Message("client_ready", __version__, "fake_client3"))

master.start(3, 3)
server.mocked_send(Message("spawning", None, "fake_client1"))
Expand Down Expand Up @@ -1583,7 +1579,7 @@ def test_last_worker_missing_stops_test(self):
def test_master_total_stats(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))
stats = RequestStats()
stats.log_request("GET", "/1", 100, 3546)
stats.log_request("GET", "/1", 800, 56743)
Expand Down Expand Up @@ -1618,7 +1614,7 @@ def test_master_total_stats(self):
def test_master_total_stats_with_none_response_times(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))
stats = RequestStats()
stats.log_request("GET", "/1", 100, 3546)
stats.log_request("GET", "/1", 800, 56743)
Expand Down Expand Up @@ -1674,7 +1670,7 @@ def test_master_current_response_times(self):
master = self.get_runner()
self.environment.stats.reset_all()
mocked_time.return_value += 1.0234
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))
stats = RequestStats()
stats.log_request("GET", "/1", 100, 3546)
stats.log_request("GET", "/1", 800, 56743)
Expand Down Expand Up @@ -1739,7 +1735,7 @@ def my_task(self):

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue(
"zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"
Expand All @@ -1756,7 +1752,7 @@ def my_task(self):
master.clients["zeh_fake_client1"].user_classes_count = {"TestUser": 100}

# let another worker connect
server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))
self.assertEqual(2, len(master.clients))
sleep(0.1) # give time for messages to be sent to clients
self.assertEqual(2, len(server.outbox))
Expand Down Expand Up @@ -1805,7 +1801,7 @@ def on_test_start(*a, **kw):
run_count[0] += 1

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
Expand Down Expand Up @@ -1842,7 +1838,7 @@ def on_test_stop(*a, **kw):
run_count[0] += 1

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
Expand All @@ -1851,7 +1847,7 @@ def on_test_stop(*a, **kw):

run_count[0] = 0
for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))
master.start(7, 7)
master.stop()
master.quit()
Expand All @@ -1877,7 +1873,7 @@ def on_test_stop(*a, **kw):
run_count[0] += 1

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
Expand Down Expand Up @@ -1923,7 +1919,7 @@ def my_task(self):
master = self.get_runner(user_classes=[TestUser])

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
Expand All @@ -1942,7 +1938,7 @@ def my_task(self):
master = self.get_runner(user_classes=[TestUser])

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(2, 2)
self.assertEqual(5, len(server.outbox))
Expand Down Expand Up @@ -1972,7 +1968,7 @@ def tick(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[MyUser])
for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

# Start the shape_worker
self.environment.shape_class.reset_time()
Expand Down Expand Up @@ -2017,7 +2013,7 @@ def tick(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[MyUser])
for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

# Start the shape_worker
self.environment.shape_class.reset_time()
Expand Down Expand Up @@ -2114,7 +2110,7 @@ def test_master_reset_connection(self):
self.assertEqual(0, len(master.clients))
server.mocked_send(Message("client_ready", NETWORK_BROKEN, "fake_client"))
self.assertTrue(master.connection_broken)
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))
sleep(0.2)
self.assertFalse(master.connection_broken)
self.assertEqual(1, len(master.clients))
Expand All @@ -2134,7 +2130,7 @@ def my_task(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[MyUser1, MyUser2])

server.mocked_send(Message("client_ready", None, "fake_client1"))
server.mocked_send(Message("client_ready", __version__, "fake_client1"))

master.start(7, 7)
self.assertEqual({"MyUser1": 3, "MyUser2": 4}, master.target_user_classes_count)
Expand Down Expand Up @@ -2684,7 +2680,7 @@ def on_custom_msg(msg, **kw):

class TestMessageSerializing(unittest.TestCase):
def test_message_serialize(self):
msg = Message("client_ready", None, "my_id")
msg = Message("client_ready", __version__, "my_id")
rebuilt = Message.unserialize(msg.serialize())
self.assertEqual(msg.type, rebuilt.type)
self.assertEqual(msg.data, rebuilt.data)
Expand Down
6 changes: 3 additions & 3 deletions locust/test/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import gevent
import mock
import locust
from locust import HttpUser, TaskSet, task, User, constant
from locust import HttpUser, TaskSet, task, User, constant, __version__
from locust.env import Environment
from locust.rpc.protocol import Message
from locust.stats import CachedResponseTimes, RequestStats, StatsEntry, diff_response_time_dicts, PERCENTILES_TO_REPORT
Expand Down Expand Up @@ -409,7 +409,7 @@ def test_csv_stats_on_master_from_aggregated_stats(self):
greenlet = gevent.spawn(stats_writer)
gevent.sleep(_TEST_CSV_STATS_INTERVAL_WAIT_SEC)

server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))

master.stats.get("/", "GET").log(100, 23455)
master.stats.get("/", "GET").log(800, 23455)
Expand Down Expand Up @@ -481,7 +481,7 @@ def test_requests_csv_quote_escaping(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
environment = Environment()
master = environment.create_master_runner(master_bind_host="*", master_bind_port=0)
server.mocked_send(Message("client_ready", None, "fake_client"))
server.mocked_send(Message("client_ready", __version__, "fake_client"))

request_name_dict = {
"scenario": "get cashes",
Expand Down

0 comments on commit a0f8884

Please sign in to comment.