From 088bfa8a2e4674a29041b6af5dad1885cd91dee4 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 21:45:42 +0000 Subject: [PATCH 1/8] upgrade pytest dependency remove hacky test-skipping workaround now that https://github.com/pytest-dev/pytest/issues/568 has been fixed --- test-requirements.txt | 2 +- tests/pykafka/__init__.py | 54 ------------------- tests/pykafka/rdkafka/test_simple_consumer.py | 11 ++-- tests/pykafka/test_balancedconsumer.py | 25 +++++---- tests/pykafka/test_producer.py | 7 ++- tests/pykafka/test_simpleconsumer.py | 2 +- 6 files changed, 22 insertions(+), 79 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index cf36367f8..9fa464274 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,6 @@ lz4==2.0.2 lz4tools==1.3.1.2 -pytest +pytest==3.6.3 pytest-cov python-snappy mock diff --git a/tests/pykafka/__init__.py b/tests/pykafka/__init__.py index 02df3009f..e69de29bb 100644 --- a/tests/pykafka/__init__.py +++ b/tests/pykafka/__init__.py @@ -1,54 +0,0 @@ -import pytest - - -def patch_subclass(parent, skip_condition): - """Work around a pytest.mark.skipif bug - - https://github.com/pytest-dev/pytest/issues/568 - - The issue causes all subclasses of a TestCase subclass to be skipped if any one - of them is skipped. - - This fix circumvents the issue by overriding Python's existing subclassing mechanism. - Instead of having `cls` be a subclass of `parent`, this decorator adds each attribute - of `parent` to `cls` without using Python inheritance. When appropriate, it also adds - a boolean condition under which to skip tests for the decorated class. - - :param parent: The "superclass" from which the decorated class should inherit - its non-overridden attributes - :type parent: unittest2.TestCase - :param skip_condition: A boolean condition that, when True, will cause all tests in - the decorated class to be skipped - :type skip_condition: bool - """ - def patcher(cls): - def build_skipped_method(method, cls, cond=None): - if cond is None: - cond = False - if hasattr(method, "skip_condition"): - cond = cond or method.skip_condition(cls) - - @pytest.mark.skipif(cond, reason="") - def _wrapper(self): - return method(self) - return _wrapper - - # two passes over parent required so that skips have access to all class - # attributes - for attr in parent.__dict__: - if attr in cls.__dict__: - continue - if not attr.startswith("test_"): - setattr(cls, attr, parent.__dict__[attr]) - - for attr in cls.__dict__: - if attr.startswith("test_"): - setattr(cls, attr, build_skipped_method(cls.__dict__[attr], - cls, skip_condition)) - - for attr in parent.__dict__: - if attr.startswith("test_"): - setattr(cls, attr, build_skipped_method(parent.__dict__[attr], - cls, skip_condition)) - return cls - return patcher diff --git a/tests/pykafka/rdkafka/test_simple_consumer.py b/tests/pykafka/rdkafka/test_simple_consumer.py index 9098fbb39..256300498 100644 --- a/tests/pykafka/rdkafka/test_simple_consumer.py +++ b/tests/pykafka/rdkafka/test_simple_consumer.py @@ -1,7 +1,6 @@ import pytest -import unittest2 -from tests.pykafka import test_simpleconsumer, test_balancedconsumer, patch_subclass +from tests.pykafka import test_simpleconsumer, test_balancedconsumer from pykafka.utils.compat import range try: from pykafka.rdkafka import _rd_kafka # noqa @@ -10,8 +9,8 @@ RDKAFKA = False # C extension not built -@patch_subclass(test_simpleconsumer.TestSimpleConsumer, not RDKAFKA) -class TestRdKafkaSimpleConsumer(unittest2.TestCase): +@pytest.mark.skipif(not RDKAFKA) +class TestRdKafkaSimpleConsumer(test_simpleconsumer.TestSimpleConsumer): USE_RDKAFKA = True def test_update_cluster(self): @@ -69,6 +68,6 @@ def _latest_partition_offsets_by_reading(consumer, n_reads): return latest_offs -@patch_subclass(test_balancedconsumer.BalancedConsumerIntegrationTests, not RDKAFKA) -class RdkBalancedConsumerIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(not RDKAFKA) +class RdkBalancedConsumerIntegrationTests(test_balancedconsumer.BalancedConsumerIntegrationTests): USE_RDKAFKA = True diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 90daead3d..2bb508bdd 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -25,7 +25,6 @@ RangeProtocol) from pykafka.test.utils import get_cluster, stop_cluster from pykafka.utils.compat import range, iterkeys, iteritems -from tests.pykafka import patch_subclass kafka_version_string = os.environ.get('KAFKA_VERSION', '0.8') @@ -234,6 +233,7 @@ def verify_extras(consumers, extras_count): except: pass + @pytest.mark.skipif(USE_GEVENT) # weird name to ensure test execution order, because there is an unintended # interdependency between test_consume_latest and other tests def test_a_rebalance_unblock_event(self): @@ -264,7 +264,6 @@ def test_a_rebalance_unblock_event(self): # consumer thread would die in case of any rebalancing errors self.assertTrue(consumer_a_thread.is_alive() and consumer_b_thread.is_alive()) - test_a_rebalance_unblock_event.skip_condition = lambda cls: cls.USE_GEVENT def test_rebalance_callbacks(self): def on_rebalance(cns, old_partition_offsets, new_partition_offsets): @@ -413,6 +412,7 @@ def test_consume_latest(self): except: pass + @pytest.mark.skipif(MANAGED_CONSUMER) def test_external_kazoo_client(self): """Run with pre-existing KazooClient instance @@ -429,7 +429,6 @@ def test_external_kazoo_client(self): use_rdkafka=self.USE_RDKAFKA) [msg for msg in consumer] consumer.stop() - test_external_kazoo_client.skip_condition = lambda cls: cls.MANAGED_CONSUMER def test_no_partitions(self): """Ensure a consumer assigned no partitions doesn't fail""" @@ -456,6 +455,7 @@ def _decide_dummy(participants, partitions, consumer_id): # check that stop() succeeds (cf #313 and #392) consumer.stop() + @pytest.mark.skipif(MANAGED_CONSUMER) def test_zk_conn_lost(self): """Check we restore zookeeper nodes correctly after connection loss @@ -498,7 +498,6 @@ def test_zk_conn_lost(self): zk.stop() except: pass - test_zk_conn_lost.skip_condition = lambda cls: cls.MANAGED_CONSUMER def wait_for_rebalancing(self, *balanced_consumers): """Test helper that loops while rebalancing is ongoing @@ -520,21 +519,21 @@ def wait_for_rebalancing(self, *balanced_consumers): raise AssertionError("Rebalancing failed") -@patch_subclass(BalancedConsumerIntegrationTests, - platform.python_implementation() == "PyPy" or gevent is None) -class BalancedConsumerGEventIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(platform.python_implementation() == "PyPy" or gevent is None, + reason="Unresolved crashes") +class BalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests): USE_GEVENT = True -@patch_subclass(BalancedConsumerIntegrationTests, kafka_version < version_09) -class ManagedBalancedConsumerIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(kafka_version < version_09, + reason="Managed consumer unsupported until 0.9") +class ManagedBalancedConsumerIntegrationTests(BalancedConsumerIntegrationTests): MANAGED_CONSUMER = True -@patch_subclass( - BalancedConsumerIntegrationTests, - platform.python_implementation() == "PyPy" or kafka_version < version_09 or gevent is None) -class ManagedBalancedConsumerGEventIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(platform.python_implementation() == "PyPy" or + kafka_version < version_09 or gevent is None) +class ManagedBalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests): MANAGED_CONSUMER = True USE_GEVENT = True diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 31e73221b..2218937c2 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -31,7 +31,6 @@ from pykafka.common import CompressionType from pykafka.producer import OwnedBroker from pykafka.utils import serialize_utf8, deserialize_utf8 -from tests.pykafka import patch_subclass kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') @@ -183,6 +182,7 @@ def test_async_produce_queue_full(self): while consumer.consume() is not None: time.sleep(.05) + @pytest.mark.skipif(RDKAFKA) def test_async_produce_lingers(self): """Ensure that the context manager waits for linger_ms milliseconds""" linger = 3 @@ -194,7 +194,6 @@ def test_async_produce_lingers(self): self.assertTrue(int(time.time() - start) >= int(linger)) consumer.consume() consumer.consume() - test_async_produce_lingers.skip_condition = lambda cls: RDKAFKA def test_async_produce_thread_exception(self): """Ensure that an exception on a worker thread is raised to the main thread""" @@ -380,8 +379,8 @@ def ensure_all_messages_consumed(): retry(ensure_all_messages_consumed, retry_time=15) -@patch_subclass(ProducerIntegrationTests, not RDKAFKA) -class TestRdKafkaProducer(unittest2.TestCase): +@pytest.mark.skipif(not RDKAFKA) +class TestRdKafkaProducer(ProducerIntegrationTests): USE_RDKAFKA = True diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index c96fd887c..30bfca155 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -239,6 +239,7 @@ def test_reset_offsets(self): self.assertEqual(msg.offset, expected_offset) self.assertEqual(consumer.held_offsets[part_id], expected_offset) + @pytest.mark.skipif(RDKAFKA) def test_update_cluster(self): """Check that the consumer can initiate cluster updates""" with self._get_simple_consumer() as consumer: @@ -262,7 +263,6 @@ def test_update_cluster(self): # If the fetcher thread fell over during the cluster update # process, we'd get an exception here: self.assertIsNotNone(consumer.consume()) - test_update_cluster.skip_condition = lambda cls: RDKAFKA def test_consumer_lag(self): """Ensure that after consuming the entire topic, lag is 0""" From f387c24264837b1d6435dcc9831806c1e7bf0458 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:36:26 +0000 Subject: [PATCH 2/8] add reason to all new skipifs --- tests/pykafka/rdkafka/test_simple_consumer.py | 4 ++-- tests/pykafka/test_balancedconsumer.py | 9 +++++---- tests/pykafka/test_producer.py | 4 ++-- tests/pykafka/test_simpleconsumer.py | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/pykafka/rdkafka/test_simple_consumer.py b/tests/pykafka/rdkafka/test_simple_consumer.py index 256300498..ed3489468 100644 --- a/tests/pykafka/rdkafka/test_simple_consumer.py +++ b/tests/pykafka/rdkafka/test_simple_consumer.py @@ -9,7 +9,7 @@ RDKAFKA = False # C extension not built -@pytest.mark.skipif(not RDKAFKA) +@pytest.mark.skipif(not RDKAFKA, reason="rdkafka") class TestRdKafkaSimpleConsumer(test_simpleconsumer.TestSimpleConsumer): USE_RDKAFKA = True @@ -68,6 +68,6 @@ def _latest_partition_offsets_by_reading(consumer, n_reads): return latest_offs -@pytest.mark.skipif(not RDKAFKA) +@pytest.mark.skipif(not RDKAFKA, reason="rdkafka") class RdkBalancedConsumerIntegrationTests(test_balancedconsumer.BalancedConsumerIntegrationTests): USE_RDKAFKA = True diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 2bb508bdd..e93ddd7c2 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -233,7 +233,7 @@ def verify_extras(consumers, extras_count): except: pass - @pytest.mark.skipif(USE_GEVENT) + @pytest.mark.skipif(USE_GEVENT, reason="Unresolved failure") # weird name to ensure test execution order, because there is an unintended # interdependency between test_consume_latest and other tests def test_a_rebalance_unblock_event(self): @@ -412,7 +412,7 @@ def test_consume_latest(self): except: pass - @pytest.mark.skipif(MANAGED_CONSUMER) + @pytest.mark.skipif(MANAGED_CONSUMER, reason="Managed consumer doesn't use zookeeper") def test_external_kazoo_client(self): """Run with pre-existing KazooClient instance @@ -455,7 +455,7 @@ def _decide_dummy(participants, partitions, consumer_id): # check that stop() succeeds (cf #313 and #392) consumer.stop() - @pytest.mark.skipif(MANAGED_CONSUMER) + @pytest.mark.skipif(MANAGED_CONSUMER, reason="Managed consumer doesn't use zookeeper") def test_zk_conn_lost(self): """Check we restore zookeeper nodes correctly after connection loss @@ -532,7 +532,8 @@ class ManagedBalancedConsumerIntegrationTests(BalancedConsumerIntegrationTests): @pytest.mark.skipif(platform.python_implementation() == "PyPy" or - kafka_version < version_09 or gevent is None) + kafka_version < version_09 or gevent is None, + reason="Unresolved crashes") class ManagedBalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests): MANAGED_CONSUMER = True USE_GEVENT = True diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 2218937c2..0dcb1341d 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -182,7 +182,7 @@ def test_async_produce_queue_full(self): while consumer.consume() is not None: time.sleep(.05) - @pytest.mark.skipif(RDKAFKA) + @pytest.mark.skipif(RDKAFKA, reason="rdkafka uses different lingering mechanism") def test_async_produce_lingers(self): """Ensure that the context manager waits for linger_ms milliseconds""" linger = 3 @@ -379,7 +379,7 @@ def ensure_all_messages_consumed(): retry(ensure_all_messages_consumed, retry_time=15) -@pytest.mark.skipif(not RDKAFKA) +@pytest.mark.skipif(not RDKAFKA, reason="rdkafka") class TestRdKafkaProducer(ProducerIntegrationTests): USE_RDKAFKA = True diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index 30bfca155..63cc6bee3 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -239,7 +239,7 @@ def test_reset_offsets(self): self.assertEqual(msg.offset, expected_offset) self.assertEqual(consumer.held_offsets[part_id], expected_offset) - @pytest.mark.skipif(RDKAFKA) + @pytest.mark.skipif(RDKAFKA, reason="Unresolved crashes") def test_update_cluster(self): """Check that the consumer can initiate cluster updates""" with self._get_simple_consumer() as consumer: From 2bd8667c9b5f94b60e5155065358827d1c011a7c Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 22:23:03 +0000 Subject: [PATCH 3/8] use skip function instead of method decorator --- tests/pykafka/test_balancedconsumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index e93ddd7c2..2be77cf1c 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -412,13 +412,14 @@ def test_consume_latest(self): except: pass - @pytest.mark.skipif(MANAGED_CONSUMER, reason="Managed consumer doesn't use zookeeper") def test_external_kazoo_client(self): """Run with pre-existing KazooClient instance This currently doesn't assert anything, it just rules out any trivial exceptions in the code path that uses an external KazooClient """ + if self.MANAGED_CONSUMER: + pytest.skip("Managed consumer doesn't use zookeeper") zk = KazooClient(self.kafka.zookeeper) zk.start() From a2c312214c3f0be91cedde1fc62ff1f0b0408e0e Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 22:27:49 +0000 Subject: [PATCH 4/8] another skipif -> skip --- tests/pykafka/test_balancedconsumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 2be77cf1c..d95683377 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -456,12 +456,13 @@ def _decide_dummy(participants, partitions, consumer_id): # check that stop() succeeds (cf #313 and #392) consumer.stop() - @pytest.mark.skipif(MANAGED_CONSUMER, reason="Managed consumer doesn't use zookeeper") def test_zk_conn_lost(self): """Check we restore zookeeper nodes correctly after connection loss See also github issue #204. """ + if self.MANAGED_CONSUMER: + pytest.skip("Managed consumer doesn't use zookeeper") check_partitions = lambda c: c._get_held_partitions() == c._partitions zk = self.get_zk() zk.start() From e8592b21482a2d5a0a03a47b88c18ac9fce709ba Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 22:56:03 +0000 Subject: [PATCH 5/8] skipid -> skip for rdkafka simpleconsumer tst --- tests/pykafka/test_simpleconsumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index 63cc6bee3..85fdb70d9 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -239,9 +239,10 @@ def test_reset_offsets(self): self.assertEqual(msg.offset, expected_offset) self.assertEqual(consumer.held_offsets[part_id], expected_offset) - @pytest.mark.skipif(RDKAFKA, reason="Unresolved crashes") def test_update_cluster(self): """Check that the consumer can initiate cluster updates""" + if self.RDKAFKA: + pytest.skip("Unresolved crashes") with self._get_simple_consumer() as consumer: self.assertIsNotNone(consumer.consume()) From b57b9375d8695d1ee737df4265d8002fbcc63ff2 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 23:15:46 +0000 Subject: [PATCH 6/8] use proper RDK flag --- tests/pykafka/test_simpleconsumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index 85fdb70d9..bb26e786d 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -241,7 +241,7 @@ def test_reset_offsets(self): def test_update_cluster(self): """Check that the consumer can initiate cluster updates""" - if self.RDKAFKA: + if self.USE_RDKAFKA: pytest.skip("Unresolved crashes") with self._get_simple_consumer() as consumer: self.assertIsNotNone(consumer.consume()) From 870b6025f152555363b8b24a637007d335eb3b35 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 24 Jul 2018 00:21:53 +0000 Subject: [PATCH 7/8] skip-f -> skip --- tests/pykafka/test_balancedconsumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index d95683377..9d31d79fb 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -233,7 +233,6 @@ def verify_extras(consumers, extras_count): except: pass - @pytest.mark.skipif(USE_GEVENT, reason="Unresolved failure") # weird name to ensure test execution order, because there is an unintended # interdependency between test_consume_latest and other tests def test_a_rebalance_unblock_event(self): @@ -242,6 +241,8 @@ def test_a_rebalance_unblock_event(self): https://github.com/Parsely/pykafka/issues/701 """ + if self.USE_GEVENT: + pytest.skip("Unresolved failure") group = b'test_rebalance' consumer_a = self.get_balanced_consumer(group, consumer_timeout_ms=-1) From f771c04fe078c56f246e1ce75148b6988351a9f7 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 24 Jul 2018 16:36:26 +0000 Subject: [PATCH 8/8] fix last few skips --- tests/pykafka/test_producer.py | 3 ++- tests/pykafka/utils/test_compression.py | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 0dcb1341d..47a9c709a 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -182,9 +182,10 @@ def test_async_produce_queue_full(self): while consumer.consume() is not None: time.sleep(.05) - @pytest.mark.skipif(RDKAFKA, reason="rdkafka uses different lingering mechanism") def test_async_produce_lingers(self): """Ensure that the context manager waits for linger_ms milliseconds""" + if self.USE_RDKAFKA: + pytest.skip("rdkafka uses different lingering mechanism") linger = 3 consumer = self._get_consumer() with self._get_producer(linger_ms=linger * 1000) as producer: diff --git a/tests/pykafka/utils/test_compression.py b/tests/pykafka/utils/test_compression.py index a5a198509..bcafa5cd1 100644 --- a/tests/pykafka/utils/test_compression.py +++ b/tests/pykafka/utils/test_compression.py @@ -32,9 +32,9 @@ def test_snappy_xerial(self): decoded = compression.decode_snappy(encoded) self.assertEqual(self.text, decoded) - @pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="PyPy fails to compress large messages with Snappy") def test_snappy_large_payload(self): + if platform.python_implementation() == "PyPy": + pytest.skip("PyPy fails to compress large messages with Snappy") payload = b''.join([uuid4().bytes for i in range(10)]) c = compression.encode_snappy(payload) self.assertEqual(compression.decode_snappy(c), payload) @@ -46,9 +46,9 @@ def test_lz4(self): decoded = compression.decode_lz4(encoded) self.assertEqual(self.text, decoded) - @pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="lz4f is currently unsupported with PyPy") def test_lz4f(self): + if platform.python_implementation() == "PyPy": + pytest.skip("lz4f is currently unsupported with PyPy") encoded = lz4f.compressFrame(self.text) self.assertNotEqual(self.text, encoded)