diff --git a/tests/bwc/test_recovery.py b/tests/bwc/test_recovery.py index 330c976e..c3034db8 100644 --- a/tests/bwc/test_recovery.py +++ b/tests/bwc/test_recovery.py @@ -15,18 +15,18 @@ def test(self): self._run_tests( [ UpgradePath('4.2.x', '4.3.x'), - # UpgradePath('4.3.0', 'latest-nightly'), + UpgradePath('4.3.0', 'latest-nightly'), ], [ - self._test_recovery_with_concurrent_indexing, - #self._test_recovery, - #self._test_update_docs, - #self._test_recovery_closed_index, - #self._test_closed_index_during_rolling_upgrade, - #self._test_auto_expand_indices_during_rolling_upgrade, - #self._test_retention_leases_established_when_promoting_primary, - #self._test_closed_index_noop_recovery, - # self._test_relocation_with_concurrent_indexing + self._test_relocation_with_concurrent_indexing, + self._test_recovery, + self._test_update_docs, + self._test_recovery_closed_index, + self._test_closed_index_during_rolling_upgrade, + self._test_auto_expand_indices_during_rolling_upgrade, + self._test_retention_leases_established_when_promoting_primary, + self._test_closed_index_noop_recovery, + self._test_relocation_with_concurrent_indexing ] ) @@ -206,41 +206,43 @@ def _test_relocation_with_concurrent_indexing(self, path, nodes): # remove the replica and guaranteed the primary is placed on the old node c.execute('''alter table doc.test set ( "number_of_replicas"=0, - "routing.allocation.enable"='NONE', + "routing.allocation.enable"='all', "routing.allocation.include._id"=? )''', (old_node_id, )) self._assert_is_green(conn, 'doc', 'test') c.execute('''alter table doc.test set ("routing.allocation.include._id"=?)''', (new_node_id, )) - insert_data(conn, 'doc', 'test', 50) - self._assert_is_green(conn, 'doc', 'test') + # ensure the relocation from old node to new node has occurred; otherwise ensureGreen can + # return true even though shards haven't moved to the new node yet (allocation was throttled). + time.sleep(3) + c.execute('select current_state from sys.allocations where node_id =?', (new_node_id,)) + current_state = c.fetchone()[0] + self.assertEqual(current_state, 'STARTED') + self._assert_is_green(conn, 'doc', 'test') c.execute('refresh table doc.test') - - c.execute('select count(*) from doc.test') - res = c.fetchone() - self.assertEqual(res[0], 60) + self._assert_count_by_node_id(conn, 'doc', 'test', new_node_id, 60) # upgrade the whole cluster to the new version self._upgrade_cluster(cluster, path.to_version, nodes) - c.execute(''' - alter table doc.test set( - "number_of_replicas"=2, - "routing.allocation.include._id"='null') - ''') + c.execute('''alter table doc.test set("number_of_replicas"=2)''') + c.execute('''alter table doc.test reset("routing.allocation.include._id")''') insert_data(conn, 'doc', 'test', 45) - time.sleep(30) + time.sleep(10) self._assert_is_green(conn, 'doc', 'test') c.execute('refresh table doc.test') + time.sleep(5) + c.execute('select id from sys.nodes') + node_ids = c.fetchall() + self.assertEqual(len(node_ids), 3) - c.execute('select count(*) from doc.test') - res = c.fetchone() - self.assertEqual(res[0], 105) + for node_id in node_ids: + self._assert_count_by_node_id(conn, 'doc', 'test', node_id[0], 105) def _test_recovery(self, path, nodes): """