diff --git a/test-requirements.txt b/test-requirements.txt index cf36367f8..9fa464274 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,6 @@ lz4==2.0.2 lz4tools==1.3.1.2 -pytest +pytest==3.6.3 pytest-cov python-snappy mock diff --git a/tests/pykafka/__init__.py b/tests/pykafka/__init__.py index 02df3009f..e69de29bb 100644 --- a/tests/pykafka/__init__.py +++ b/tests/pykafka/__init__.py @@ -1,54 +0,0 @@ -import pytest - - -def patch_subclass(parent, skip_condition): - """Work around a pytest.mark.skipif bug - - https://github.com/pytest-dev/pytest/issues/568 - - The issue causes all subclasses of a TestCase subclass to be skipped if any one - of them is skipped. - - This fix circumvents the issue by overriding Python's existing subclassing mechanism. - Instead of having `cls` be a subclass of `parent`, this decorator adds each attribute - of `parent` to `cls` without using Python inheritance. When appropriate, it also adds - a boolean condition under which to skip tests for the decorated class. - - :param parent: The "superclass" from which the decorated class should inherit - its non-overridden attributes - :type parent: unittest2.TestCase - :param skip_condition: A boolean condition that, when True, will cause all tests in - the decorated class to be skipped - :type skip_condition: bool - """ - def patcher(cls): - def build_skipped_method(method, cls, cond=None): - if cond is None: - cond = False - if hasattr(method, "skip_condition"): - cond = cond or method.skip_condition(cls) - - @pytest.mark.skipif(cond, reason="") - def _wrapper(self): - return method(self) - return _wrapper - - # two passes over parent required so that skips have access to all class - # attributes - for attr in parent.__dict__: - if attr in cls.__dict__: - continue - if not attr.startswith("test_"): - setattr(cls, attr, parent.__dict__[attr]) - - for attr in cls.__dict__: - if attr.startswith("test_"): - setattr(cls, attr, build_skipped_method(cls.__dict__[attr], - cls, skip_condition)) - - for attr in parent.__dict__: - if attr.startswith("test_"): - setattr(cls, attr, build_skipped_method(parent.__dict__[attr], - cls, skip_condition)) - return cls - return patcher diff --git a/tests/pykafka/rdkafka/test_simple_consumer.py b/tests/pykafka/rdkafka/test_simple_consumer.py index 9098fbb39..ed3489468 100644 --- a/tests/pykafka/rdkafka/test_simple_consumer.py +++ b/tests/pykafka/rdkafka/test_simple_consumer.py @@ -1,7 +1,6 @@ import pytest -import unittest2 -from tests.pykafka import test_simpleconsumer, test_balancedconsumer, patch_subclass +from tests.pykafka import test_simpleconsumer, test_balancedconsumer from pykafka.utils.compat import range try: from pykafka.rdkafka import _rd_kafka # noqa @@ -10,8 +9,8 @@ RDKAFKA = False # C extension not built -@patch_subclass(test_simpleconsumer.TestSimpleConsumer, not RDKAFKA) -class TestRdKafkaSimpleConsumer(unittest2.TestCase): +@pytest.mark.skipif(not RDKAFKA, reason="rdkafka") +class TestRdKafkaSimpleConsumer(test_simpleconsumer.TestSimpleConsumer): USE_RDKAFKA = True def test_update_cluster(self): @@ -69,6 +68,6 @@ def _latest_partition_offsets_by_reading(consumer, n_reads): return latest_offs -@patch_subclass(test_balancedconsumer.BalancedConsumerIntegrationTests, not RDKAFKA) -class RdkBalancedConsumerIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(not RDKAFKA, reason="rdkafka") +class RdkBalancedConsumerIntegrationTests(test_balancedconsumer.BalancedConsumerIntegrationTests): USE_RDKAFKA = True diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 90daead3d..9d31d79fb 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -25,7 +25,6 @@ RangeProtocol) from pykafka.test.utils import get_cluster, stop_cluster from pykafka.utils.compat import range, iterkeys, iteritems -from tests.pykafka import patch_subclass kafka_version_string = os.environ.get('KAFKA_VERSION', '0.8') @@ -242,6 +241,8 @@ def test_a_rebalance_unblock_event(self): https://github.com/Parsely/pykafka/issues/701 """ + if self.USE_GEVENT: + pytest.skip("Unresolved failure") group = b'test_rebalance' consumer_a = self.get_balanced_consumer(group, consumer_timeout_ms=-1) @@ -264,7 +265,6 @@ def test_a_rebalance_unblock_event(self): # consumer thread would die in case of any rebalancing errors self.assertTrue(consumer_a_thread.is_alive() and consumer_b_thread.is_alive()) - test_a_rebalance_unblock_event.skip_condition = lambda cls: cls.USE_GEVENT def test_rebalance_callbacks(self): def on_rebalance(cns, old_partition_offsets, new_partition_offsets): @@ -419,6 +419,8 @@ def test_external_kazoo_client(self): This currently doesn't assert anything, it just rules out any trivial exceptions in the code path that uses an external KazooClient """ + if self.MANAGED_CONSUMER: + pytest.skip("Managed consumer doesn't use zookeeper") zk = KazooClient(self.kafka.zookeeper) zk.start() @@ -429,7 +431,6 @@ def test_external_kazoo_client(self): use_rdkafka=self.USE_RDKAFKA) [msg for msg in consumer] consumer.stop() - test_external_kazoo_client.skip_condition = lambda cls: cls.MANAGED_CONSUMER def test_no_partitions(self): """Ensure a consumer assigned no partitions doesn't fail""" @@ -461,6 +462,8 @@ def test_zk_conn_lost(self): See also github issue #204. """ + if self.MANAGED_CONSUMER: + pytest.skip("Managed consumer doesn't use zookeeper") check_partitions = lambda c: c._get_held_partitions() == c._partitions zk = self.get_zk() zk.start() @@ -498,7 +501,6 @@ def test_zk_conn_lost(self): zk.stop() except: pass - test_zk_conn_lost.skip_condition = lambda cls: cls.MANAGED_CONSUMER def wait_for_rebalancing(self, *balanced_consumers): """Test helper that loops while rebalancing is ongoing @@ -520,21 +522,22 @@ def wait_for_rebalancing(self, *balanced_consumers): raise AssertionError("Rebalancing failed") -@patch_subclass(BalancedConsumerIntegrationTests, - platform.python_implementation() == "PyPy" or gevent is None) -class BalancedConsumerGEventIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(platform.python_implementation() == "PyPy" or gevent is None, + reason="Unresolved crashes") +class BalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests): USE_GEVENT = True -@patch_subclass(BalancedConsumerIntegrationTests, kafka_version < version_09) -class ManagedBalancedConsumerIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(kafka_version < version_09, + reason="Managed consumer unsupported until 0.9") +class ManagedBalancedConsumerIntegrationTests(BalancedConsumerIntegrationTests): MANAGED_CONSUMER = True -@patch_subclass( - BalancedConsumerIntegrationTests, - platform.python_implementation() == "PyPy" or kafka_version < version_09 or gevent is None) -class ManagedBalancedConsumerGEventIntegrationTests(unittest2.TestCase): +@pytest.mark.skipif(platform.python_implementation() == "PyPy" or + kafka_version < version_09 or gevent is None, + reason="Unresolved crashes") +class ManagedBalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests): MANAGED_CONSUMER = True USE_GEVENT = True diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 31e73221b..47a9c709a 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -31,7 +31,6 @@ from pykafka.common import CompressionType from pykafka.producer import OwnedBroker from pykafka.utils import serialize_utf8, deserialize_utf8 -from tests.pykafka import patch_subclass kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') @@ -185,6 +184,8 @@ def test_async_produce_queue_full(self): def test_async_produce_lingers(self): """Ensure that the context manager waits for linger_ms milliseconds""" + if self.USE_RDKAFKA: + pytest.skip("rdkafka uses different lingering mechanism") linger = 3 consumer = self._get_consumer() with self._get_producer(linger_ms=linger * 1000) as producer: @@ -194,7 +195,6 @@ def test_async_produce_lingers(self): self.assertTrue(int(time.time() - start) >= int(linger)) consumer.consume() consumer.consume() - test_async_produce_lingers.skip_condition = lambda cls: RDKAFKA def test_async_produce_thread_exception(self): """Ensure that an exception on a worker thread is raised to the main thread""" @@ -380,8 +380,8 @@ def ensure_all_messages_consumed(): retry(ensure_all_messages_consumed, retry_time=15) -@patch_subclass(ProducerIntegrationTests, not RDKAFKA) -class TestRdKafkaProducer(unittest2.TestCase): +@pytest.mark.skipif(not RDKAFKA, reason="rdkafka") +class TestRdKafkaProducer(ProducerIntegrationTests): USE_RDKAFKA = True diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index c96fd887c..bb26e786d 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -241,6 +241,8 @@ def test_reset_offsets(self): def test_update_cluster(self): """Check that the consumer can initiate cluster updates""" + if self.USE_RDKAFKA: + pytest.skip("Unresolved crashes") with self._get_simple_consumer() as consumer: self.assertIsNotNone(consumer.consume()) @@ -262,7 +264,6 @@ def test_update_cluster(self): # If the fetcher thread fell over during the cluster update # process, we'd get an exception here: self.assertIsNotNone(consumer.consume()) - test_update_cluster.skip_condition = lambda cls: RDKAFKA def test_consumer_lag(self): """Ensure that after consuming the entire topic, lag is 0""" diff --git a/tests/pykafka/utils/test_compression.py b/tests/pykafka/utils/test_compression.py index a5a198509..bcafa5cd1 100644 --- a/tests/pykafka/utils/test_compression.py +++ b/tests/pykafka/utils/test_compression.py @@ -32,9 +32,9 @@ def test_snappy_xerial(self): decoded = compression.decode_snappy(encoded) self.assertEqual(self.text, decoded) - @pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="PyPy fails to compress large messages with Snappy") def test_snappy_large_payload(self): + if platform.python_implementation() == "PyPy": + pytest.skip("PyPy fails to compress large messages with Snappy") payload = b''.join([uuid4().bytes for i in range(10)]) c = compression.encode_snappy(payload) self.assertEqual(compression.decode_snappy(c), payload) @@ -46,9 +46,9 @@ def test_lz4(self): decoded = compression.decode_lz4(encoded) self.assertEqual(self.text, decoded) - @pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="lz4f is currently unsupported with PyPy") def test_lz4f(self): + if platform.python_implementation() == "PyPy": + pytest.skip("lz4f is currently unsupported with PyPy") encoded = lz4f.compressFrame(self.text) self.assertNotEqual(self.text, encoded)