diff --git a/locust/__init__.py b/locust/__init__.py index e53c88e224..4cd40cd234 100644 --- a/locust/__init__.py +++ b/locust/__init__.py @@ -1,3 +1,5 @@ +__version__ = "2.0.0b0" + from gevent import monkey monkey.patch_all() @@ -13,7 +15,6 @@ events = Events() -__version__ = "2.0.0b0" __all__ = ( "SequentialTaskSet", "wait_time", diff --git a/locust/runners.py b/locust/runners.py index 8e107ce4fc..92ea2a0a41 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -28,6 +28,7 @@ from gevent.pool import Group from . import User +from locust import __version__ from .dispatch import UsersDispatcher from .distribution import weight_users from .exception import RPCError @@ -842,6 +843,13 @@ def client_listener(self): self.connection_broken = False msg.node_id = client_id if msg.type == "client_ready": + if not msg.data: + logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.") + continue + elif msg.data != __version__: + logger.warning( + f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}" + ) worker_node_id = msg.node_id self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS) logger.info( @@ -959,7 +967,7 @@ def __init__(self, environment, master_host, master_port): self.client = rpc.Client(master_host, master_port, self.client_id) self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler) self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler) - self.client.send(Message("client_ready", None, self.client_id)) + self.client.send(Message("client_ready", __version__, self.client_id)) self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler) # register listener for when all users have spawned, and report it to the master node @@ -1092,7 +1100,7 @@ def worker(self): # random delays inherent to distributed systems. additional_wait = int(os.getenv("LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP", 0)) gevent.sleep((self.environment.stop_timeout or 0) + additional_wait) - self.client.send(Message("client_ready", None, self.client_id)) + self.client.send(Message("client_ready", __version__, self.client_id)) self.worker_state = STATE_INIT elif msg.type == "quit": logger.info("Got quit message from master, shutting down...") diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 181c656984..654981d52b 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -19,6 +19,7 @@ LoadTestShape, constant, runners, + __version__, ) from locust.env import Environment from locust.exception import ( @@ -1449,14 +1450,14 @@ def get_runner(self, user_classes=None): def test_worker_connect(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "zeh_fake_client1")) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1")) self.assertEqual(1, len(master.clients)) self.assertTrue( "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict" ) - server.mocked_send(Message("client_ready", None, "zeh_fake_client2")) - server.mocked_send(Message("client_ready", None, "zeh_fake_client3")) - server.mocked_send(Message("client_ready", None, "zeh_fake_client4")) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2")) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client3")) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client4")) self.assertEqual(4, len(master.clients)) server.mocked_send(Message("quit", None, "zeh_fake_client3")) @@ -1465,7 +1466,7 @@ def test_worker_connect(self): def test_worker_stats_report_median(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) master.stats.get("/", "GET").log(100, 23455) master.stats.get("/", "GET").log(800, 23455) @@ -1482,7 +1483,7 @@ def test_worker_stats_report_median(self): def test_worker_stats_report_with_none_response_times(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) master.stats.get("/mixed", "GET").log(0, 23455) master.stats.get("/mixed", "GET").log(800, 23455) @@ -1508,7 +1509,7 @@ def test_worker_stats_report_with_none_response_times(self): def test_master_marks_downed_workers_as_missing(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) sleep(6) # print(master.clients['fake_client'].__dict__) assert master.clients["fake_client"].state == STATE_MISSING @@ -1516,8 +1517,8 @@ def test_master_marks_downed_workers_as_missing(self): def test_last_worker_quitting_stops_test(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client1")) - server.mocked_send(Message("client_ready", None, "fake_client2")) + server.mocked_send(Message("client_ready", __version__, "fake_client1")) + server.mocked_send(Message("client_ready", __version__, "fake_client2")) master.start(1, 2) server.mocked_send(Message("spawning", None, "fake_client1")) @@ -1537,9 +1538,9 @@ def test_last_worker_quitting_stops_test(self): def test_last_worker_missing_stops_test(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client1")) - server.mocked_send(Message("client_ready", None, "fake_client2")) - server.mocked_send(Message("client_ready", None, "fake_client3")) + server.mocked_send(Message("client_ready", __version__, "fake_client1")) + server.mocked_send(Message("client_ready", __version__, "fake_client2")) + server.mocked_send(Message("client_ready", __version__, "fake_client3")) master.start(3, 3) server.mocked_send(Message("spawning", None, "fake_client1")) @@ -1583,7 +1584,7 @@ def test_last_worker_missing_stops_test(self): def test_master_total_stats(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) stats = RequestStats() stats.log_request("GET", "/1", 100, 3546) stats.log_request("GET", "/1", 800, 56743) @@ -1618,7 +1619,7 @@ def test_master_total_stats(self): def test_master_total_stats_with_none_response_times(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) stats = RequestStats() stats.log_request("GET", "/1", 100, 3546) stats.log_request("GET", "/1", 800, 56743) @@ -1674,7 +1675,7 @@ def test_master_current_response_times(self): master = self.get_runner() self.environment.stats.reset_all() mocked_time.return_value += 1.0234 - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) stats = RequestStats() stats.log_request("GET", "/1", 100, 3546) stats.log_request("GET", "/1", 800, 56743) @@ -1739,7 +1740,7 @@ def my_task(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner(user_classes=[TestUser]) - server.mocked_send(Message("client_ready", None, "zeh_fake_client1")) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1")) self.assertEqual(1, len(master.clients)) self.assertTrue( "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict" @@ -1756,7 +1757,7 @@ def my_task(self): master.clients["zeh_fake_client1"].user_classes_count = {"TestUser": 100} # let another worker connect - server.mocked_send(Message("client_ready", None, "zeh_fake_client2")) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2")) self.assertEqual(2, len(master.clients)) sleep(0.1) # give time for messages to be sent to clients self.assertEqual(2, len(server.outbox)) @@ -1805,7 +1806,7 @@ def on_test_start(*a, **kw): run_count[0] += 1 for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) master.start(7, 7) self.assertEqual(5, len(server.outbox)) @@ -1842,7 +1843,7 @@ def on_test_stop(*a, **kw): run_count[0] += 1 for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) master.start(7, 7) self.assertEqual(5, len(server.outbox)) @@ -1851,7 +1852,7 @@ def on_test_stop(*a, **kw): run_count[0] = 0 for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) master.start(7, 7) master.stop() master.quit() @@ -1877,7 +1878,7 @@ def on_test_stop(*a, **kw): run_count[0] += 1 for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) master.start(7, 7) self.assertEqual(5, len(server.outbox)) @@ -1923,7 +1924,7 @@ def my_task(self): master = self.get_runner(user_classes=[TestUser]) for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) master.start(7, 7) self.assertEqual(5, len(server.outbox)) @@ -1942,7 +1943,7 @@ def my_task(self): master = self.get_runner(user_classes=[TestUser]) for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) master.start(2, 2) self.assertEqual(5, len(server.outbox)) @@ -1972,7 +1973,7 @@ def tick(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner(user_classes=[MyUser]) for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) # Start the shape_worker self.environment.shape_class.reset_time() @@ -2017,7 +2018,7 @@ def tick(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner(user_classes=[MyUser]) for i in range(5): - server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) # Start the shape_worker self.environment.shape_class.reset_time() @@ -2114,7 +2115,7 @@ def test_master_reset_connection(self): self.assertEqual(0, len(master.clients)) server.mocked_send(Message("client_ready", NETWORK_BROKEN, "fake_client")) self.assertTrue(master.connection_broken) - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) sleep(0.2) self.assertFalse(master.connection_broken) self.assertEqual(1, len(master.clients)) @@ -2134,7 +2135,7 @@ def my_task(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner(user_classes=[MyUser1, MyUser2]) - server.mocked_send(Message("client_ready", None, "fake_client1")) + server.mocked_send(Message("client_ready", __version__, "fake_client1")) master.start(7, 7) self.assertEqual({"MyUser1": 3, "MyUser2": 4}, master.target_user_classes_count) @@ -2684,7 +2685,7 @@ def on_custom_msg(msg, **kw): class TestMessageSerializing(unittest.TestCase): def test_message_serialize(self): - msg = Message("client_ready", None, "my_id") + msg = Message("client_ready", __version__, "my_id") rebuilt = Message.unserialize(msg.serialize()) self.assertEqual(msg.type, rebuilt.type) self.assertEqual(msg.data, rebuilt.data) diff --git a/locust/test/test_stats.py b/locust/test/test_stats.py index e6dda31b98..b183b0be82 100644 --- a/locust/test/test_stats.py +++ b/locust/test/test_stats.py @@ -8,7 +8,7 @@ import gevent import mock import locust -from locust import HttpUser, TaskSet, task, User, constant +from locust import HttpUser, TaskSet, task, User, constant, __version__ from locust.env import Environment from locust.rpc.protocol import Message from locust.stats import CachedResponseTimes, RequestStats, StatsEntry, diff_response_time_dicts, PERCENTILES_TO_REPORT @@ -409,7 +409,7 @@ def test_csv_stats_on_master_from_aggregated_stats(self): greenlet = gevent.spawn(stats_writer) gevent.sleep(_TEST_CSV_STATS_INTERVAL_WAIT_SEC) - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) master.stats.get("/", "GET").log(100, 23455) master.stats.get("/", "GET").log(800, 23455) @@ -481,7 +481,7 @@ def test_requests_csv_quote_escaping(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: environment = Environment() master = environment.create_master_runner(master_bind_host="*", master_bind_port=0) - server.mocked_send(Message("client_ready", None, "fake_client")) + server.mocked_send(Message("client_ready", __version__, "fake_client")) request_name_dict = { "scenario": "get cashes",