Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

upgrade pytest dependency #839

Merged
merged 8 commits into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lz4==2.0.2
lz4tools==1.3.1.2
pytest
pytest==3.6.3
pytest-cov
python-snappy
mock
Expand Down
54 changes: 0 additions & 54 deletions tests/pykafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,54 +0,0 @@
import pytest


def patch_subclass(parent, skip_condition):
"""Work around a pytest.mark.skipif bug

https://github.com/pytest-dev/pytest/issues/568

The issue causes all subclasses of a TestCase subclass to be skipped if any one
of them is skipped.

This fix circumvents the issue by overriding Python's existing subclassing mechanism.
Instead of having `cls` be a subclass of `parent`, this decorator adds each attribute
of `parent` to `cls` without using Python inheritance. When appropriate, it also adds
a boolean condition under which to skip tests for the decorated class.

:param parent: The "superclass" from which the decorated class should inherit
its non-overridden attributes
:type parent: unittest2.TestCase
:param skip_condition: A boolean condition that, when True, will cause all tests in
the decorated class to be skipped
:type skip_condition: bool
"""
def patcher(cls):
def build_skipped_method(method, cls, cond=None):
if cond is None:
cond = False
if hasattr(method, "skip_condition"):
cond = cond or method.skip_condition(cls)

@pytest.mark.skipif(cond, reason="")
def _wrapper(self):
return method(self)
return _wrapper

# two passes over parent required so that skips have access to all class
# attributes
for attr in parent.__dict__:
if attr in cls.__dict__:
continue
if not attr.startswith("test_"):
setattr(cls, attr, parent.__dict__[attr])

for attr in cls.__dict__:
if attr.startswith("test_"):
setattr(cls, attr, build_skipped_method(cls.__dict__[attr],
cls, skip_condition))

for attr in parent.__dict__:
if attr.startswith("test_"):
setattr(cls, attr, build_skipped_method(parent.__dict__[attr],
cls, skip_condition))
return cls
return patcher
11 changes: 5 additions & 6 deletions tests/pykafka/rdkafka/test_simple_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest
import unittest2

from tests.pykafka import test_simpleconsumer, test_balancedconsumer, patch_subclass
from tests.pykafka import test_simpleconsumer, test_balancedconsumer
from pykafka.utils.compat import range
try:
from pykafka.rdkafka import _rd_kafka # noqa
Expand All @@ -10,8 +9,8 @@
RDKAFKA = False # C extension not built


@patch_subclass(test_simpleconsumer.TestSimpleConsumer, not RDKAFKA)
class TestRdKafkaSimpleConsumer(unittest2.TestCase):
@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class TestRdKafkaSimpleConsumer(test_simpleconsumer.TestSimpleConsumer):
USE_RDKAFKA = True

def test_update_cluster(self):
Expand Down Expand Up @@ -69,6 +68,6 @@ def _latest_partition_offsets_by_reading(consumer, n_reads):
return latest_offs


@patch_subclass(test_balancedconsumer.BalancedConsumerIntegrationTests, not RDKAFKA)
class RdkBalancedConsumerIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class RdkBalancedConsumerIntegrationTests(test_balancedconsumer.BalancedConsumerIntegrationTests):
USE_RDKAFKA = True
29 changes: 16 additions & 13 deletions tests/pykafka/test_balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
RangeProtocol)
from pykafka.test.utils import get_cluster, stop_cluster
from pykafka.utils.compat import range, iterkeys, iteritems
from tests.pykafka import patch_subclass


kafka_version_string = os.environ.get('KAFKA_VERSION', '0.8')
Expand Down Expand Up @@ -242,6 +241,8 @@ def test_a_rebalance_unblock_event(self):

https://github.com/Parsely/pykafka/issues/701
"""
if self.USE_GEVENT:
pytest.skip("Unresolved failure")
group = b'test_rebalance'
consumer_a = self.get_balanced_consumer(group, consumer_timeout_ms=-1)

Expand All @@ -264,7 +265,6 @@ def test_a_rebalance_unblock_event(self):

# consumer thread would die in case of any rebalancing errors
self.assertTrue(consumer_a_thread.is_alive() and consumer_b_thread.is_alive())
test_a_rebalance_unblock_event.skip_condition = lambda cls: cls.USE_GEVENT

def test_rebalance_callbacks(self):
def on_rebalance(cns, old_partition_offsets, new_partition_offsets):
Expand Down Expand Up @@ -419,6 +419,8 @@ def test_external_kazoo_client(self):
This currently doesn't assert anything, it just rules out any trivial
exceptions in the code path that uses an external KazooClient
"""
if self.MANAGED_CONSUMER:
pytest.skip("Managed consumer doesn't use zookeeper")
zk = KazooClient(self.kafka.zookeeper)
zk.start()

Expand All @@ -429,7 +431,6 @@ def test_external_kazoo_client(self):
use_rdkafka=self.USE_RDKAFKA)
[msg for msg in consumer]
consumer.stop()
test_external_kazoo_client.skip_condition = lambda cls: cls.MANAGED_CONSUMER

def test_no_partitions(self):
"""Ensure a consumer assigned no partitions doesn't fail"""
Expand Down Expand Up @@ -461,6 +462,8 @@ def test_zk_conn_lost(self):

See also github issue #204.
"""
if self.MANAGED_CONSUMER:
pytest.skip("Managed consumer doesn't use zookeeper")
check_partitions = lambda c: c._get_held_partitions() == c._partitions
zk = self.get_zk()
zk.start()
Expand Down Expand Up @@ -498,7 +501,6 @@ def test_zk_conn_lost(self):
zk.stop()
except:
pass
test_zk_conn_lost.skip_condition = lambda cls: cls.MANAGED_CONSUMER

def wait_for_rebalancing(self, *balanced_consumers):
"""Test helper that loops while rebalancing is ongoing
Expand All @@ -520,21 +522,22 @@ def wait_for_rebalancing(self, *balanced_consumers):
raise AssertionError("Rebalancing failed")


@patch_subclass(BalancedConsumerIntegrationTests,
platform.python_implementation() == "PyPy" or gevent is None)
class BalancedConsumerGEventIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(platform.python_implementation() == "PyPy" or gevent is None,
reason="Unresolved crashes")
class BalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests):
USE_GEVENT = True


@patch_subclass(BalancedConsumerIntegrationTests, kafka_version < version_09)
class ManagedBalancedConsumerIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(kafka_version < version_09,
reason="Managed consumer unsupported until 0.9")
class ManagedBalancedConsumerIntegrationTests(BalancedConsumerIntegrationTests):
MANAGED_CONSUMER = True


@patch_subclass(
BalancedConsumerIntegrationTests,
platform.python_implementation() == "PyPy" or kafka_version < version_09 or gevent is None)
class ManagedBalancedConsumerGEventIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(platform.python_implementation() == "PyPy" or
kafka_version < version_09 or gevent is None,
reason="Unresolved crashes")
class ManagedBalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests):
MANAGED_CONSUMER = True
USE_GEVENT = True

Expand Down
8 changes: 4 additions & 4 deletions tests/pykafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from pykafka.common import CompressionType
from pykafka.producer import OwnedBroker
from pykafka.utils import serialize_utf8, deserialize_utf8
from tests.pykafka import patch_subclass

kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')

Expand Down Expand Up @@ -185,6 +184,8 @@ def test_async_produce_queue_full(self):

def test_async_produce_lingers(self):
"""Ensure that the context manager waits for linger_ms milliseconds"""
if self.USE_RDKAFKA:
pytest.skip("rdkafka uses different lingering mechanism")
linger = 3
consumer = self._get_consumer()
with self._get_producer(linger_ms=linger * 1000) as producer:
Expand All @@ -194,7 +195,6 @@ def test_async_produce_lingers(self):
self.assertTrue(int(time.time() - start) >= int(linger))
consumer.consume()
consumer.consume()
test_async_produce_lingers.skip_condition = lambda cls: RDKAFKA

def test_async_produce_thread_exception(self):
"""Ensure that an exception on a worker thread is raised to the main thread"""
Expand Down Expand Up @@ -380,8 +380,8 @@ def ensure_all_messages_consumed():
retry(ensure_all_messages_consumed, retry_time=15)


@patch_subclass(ProducerIntegrationTests, not RDKAFKA)
class TestRdKafkaProducer(unittest2.TestCase):
@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class TestRdKafkaProducer(ProducerIntegrationTests):
USE_RDKAFKA = True


Expand Down
3 changes: 2 additions & 1 deletion tests/pykafka/test_simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ def test_reset_offsets(self):

def test_update_cluster(self):
"""Check that the consumer can initiate cluster updates"""
if self.USE_RDKAFKA:
pytest.skip("Unresolved crashes")
with self._get_simple_consumer() as consumer:
self.assertIsNotNone(consumer.consume())

Expand All @@ -262,7 +264,6 @@ def test_update_cluster(self):
# If the fetcher thread fell over during the cluster update
# process, we'd get an exception here:
self.assertIsNotNone(consumer.consume())
test_update_cluster.skip_condition = lambda cls: RDKAFKA

def test_consumer_lag(self):
"""Ensure that after consuming the entire topic, lag is 0"""
Expand Down
8 changes: 4 additions & 4 deletions tests/pykafka/utils/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def test_snappy_xerial(self):
decoded = compression.decode_snappy(encoded)
self.assertEqual(self.text, decoded)

@pytest.mark.skipif(platform.python_implementation() == "PyPy",
reason="PyPy fails to compress large messages with Snappy")
def test_snappy_large_payload(self):
if platform.python_implementation() == "PyPy":
pytest.skip("PyPy fails to compress large messages with Snappy")
payload = b''.join([uuid4().bytes for i in range(10)])
c = compression.encode_snappy(payload)
self.assertEqual(compression.decode_snappy(c), payload)
Expand All @@ -46,9 +46,9 @@ def test_lz4(self):
decoded = compression.decode_lz4(encoded)
self.assertEqual(self.text, decoded)

@pytest.mark.skipif(platform.python_implementation() == "PyPy",
reason="lz4f is currently unsupported with PyPy")
def test_lz4f(self):
if platform.python_implementation() == "PyPy":
pytest.skip("lz4f is currently unsupported with PyPy")
encoded = lz4f.compressFrame(self.text)
self.assertNotEqual(self.text, encoded)

Expand Down