Skip to content
This repository has been archived by the owner on Nov 7, 2021. It is now read-only.

Update tests to run with multiple elasticsearch versions #123

Merged
merged 8 commits into from
Mar 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@ before_cache:

env:
global:
- TASKS="cov setup-check" TEST_ARGS="-rsxX -v"
- TASKS="cov setup-check" TEST_ARGS="-rsxX -v --es-tag=1.7 --es-tag=2.4 --es-tag=5.2"
matrix:
- PYTHONASYNCIODEBUG=1
- PYTHONASYNCIODEBUG=
matrix:
include:
- python: "3.6"
env: TASKS="cmp"
- python: "3.6"
env: TASKS="cov" TEST_ARGS="-rsxX -v --es_tag=5.2"
allow_failures:
- env: TASKS="cov" TEST_ARGS="-rsxX -v --es_tag=5.2"

deploy:
provider: pypi
Expand Down
31 changes: 23 additions & 8 deletions aioes/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def index(self, index, doc_type, body, id=None, *,
if percolate is not default:
params['percolate'] = percolate
if refresh is not default:
params['refresh'] = bool(refresh)
if refresh != 'wait_for':
refresh = str(bool(refresh)).lower()
params['refresh'] = refresh
if replication is not default:
if not isinstance(replication, str):
raise TypeError("'replication' parameter is not a string")
Expand Down Expand Up @@ -275,7 +277,9 @@ def get_source(self, index, id, doc_type='_all', *,
if realtime is not default:
params['realtime'] = bool(realtime)
if refresh is not default:
params['refresh'] = bool(refresh)
if refresh != 'wait_for':
refresh = str(bool(refresh)).lower()
params['refresh'] = refresh
if routing is not default:
params['routing'] = routing
if version is not default:
Expand Down Expand Up @@ -326,7 +330,9 @@ def update(self, index, doc_type, id, body=None, *,
if parent is not default:
params['parent'] = parent
if refresh is not default:
params['refresh'] = bool(refresh)
if refresh != 'wait_for':
refresh = str(bool(refresh)).lower()
params['refresh'] = refresh
if replication is not default:
if not isinstance(replication, str):
raise TypeError("'replication' parameter is not a string")
Expand Down Expand Up @@ -374,7 +380,7 @@ def mget(self, body, index=None, doc_type=None, *,
_source=default, _source_exclude=default,
_source_include=default, fields=default, parent=default,
preference=default, realtime=default, refresh=default,
routing=default):
routing=default, stored_fields=default):
"""
Get multiple documents based on an index, type (optional) and ids.
"""
Expand All @@ -397,6 +403,8 @@ def mget(self, body, index=None, doc_type=None, *,
params['refresh'] = bool(refresh)
if routing is not default:
params['routing'] = routing
if stored_fields is not default:
params['stored_fields'] = stored_fields

_, data = yield from self.transport.perform_request(
'GET',
Expand All @@ -420,7 +428,8 @@ def search(self, index=None, doc_type=None, body=None, *,
sort=default, source=default, stats=default,
suggest_field=default, suggest_mode=default,
suggest_size=default, suggest_text=default,
timeout=default, version=default):
timeout=default, version=default,
stored_fields=default):
"""
Execute a search query and get back search hits that match the query.
"""
Expand Down Expand Up @@ -483,6 +492,8 @@ def search(self, index=None, doc_type=None, body=None, *,
params['version'] = int(version)
if analyzer is not default:
params['analyzer'] = analyzer
if stored_fields is not default:
params['stored_fields'] = stored_fields

if expand_wildcards is not default:
if not isinstance(expand_wildcards, str):
Expand Down Expand Up @@ -636,7 +647,7 @@ def explain(self, index, doc_type, id, body=None, *,
df=default, fields=default, lenient=default,
lowercase_expanded_terms=default, parent=default,
preference=default, q=default, routing=default,
source=default):
source=default, stored_fields=default):
"""
The explain api computes a score explanation for a query and a
specific document. This can give useful feedback whether a document
Expand Down Expand Up @@ -671,6 +682,8 @@ def explain(self, index, doc_type, id, body=None, *,
params['routing'] = routing
if source is not default:
params['source'] = source
if stored_fields is not default:
params['stored_fields'] = stored_fields
if default_operator is not default:
if not isinstance(default_operator, str):
raise TypeError("'default_operator' parameter is not a string")
Expand Down Expand Up @@ -747,7 +760,9 @@ def delete(self, index, doc_type=None, id=None, *,
if parent is not default:
params['parent'] = parent
if refresh is not default:
params['refresh'] = bool(refresh)
if refresh != 'wait_for':
refresh = str(bool(refresh)).lower()
params['refresh'] = refresh
if routing is not default:
params['routing'] = routing
if version is not default:
Expand Down Expand Up @@ -831,7 +846,7 @@ def bulk(self, body, index=None, doc_type=None, *,
raise ValueError("'consistency' parameter should be one of "
"'one', 'quorum', 'all'")
if refresh is not default:
params['refresh'] = bool(refresh)
params['refresh'] = str(bool(refresh)).lower()
if routing is not default:
params['routing'] = routing
if replication is not default:
Expand Down
15 changes: 14 additions & 1 deletion aioes/client/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ class ClusterClient(NamespacedClient):
def health(self, index=None, *,
level=default, local=default, master_timeout=default,
timeout=default, wait_for_active_shards=default,
wait_for_nodes=default, wait_for_relocating_shards=default,
wait_for_nodes=default,
wait_for_relocating_shards=default,
wait_for_no_relocating_shards=default,
wait_for_status=default):
"""
Get a very simple status on the health of the cluster.
Expand Down Expand Up @@ -56,8 +58,19 @@ def health(self, index=None, *,
if wait_for_nodes is not default:
params['wait_for_nodes'] = str(wait_for_nodes)
if wait_for_relocating_shards is not default:
if wait_for_no_relocating_shards is not default:
raise ValueError("Either wait_for_relocating_shards or"
" wait_for_no_relocating_shards must be set,"
" not both")
params['wait_for_relocating_shards'] = \
int(wait_for_relocating_shards)
if wait_for_no_relocating_shards is not default:
if wait_for_relocating_shards is not default:
raise ValueError("Either wait_for_relocating_shards or"
" wait_for_no_relocating_shards must be set,"
" not both")
params['wait_for_no_relocating_shards'] = \
int(wait_for_no_relocating_shards)
if wait_for_status is not default:
if not isinstance(wait_for_status, str):
raise TypeError("'wait_for_status' parameter is not a string")
Expand Down
51 changes: 49 additions & 2 deletions aioes/client/indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@ class IndicesClient(NamespacedClient):
def analyze(self, index=None, body=None, *,
analyzer=default, char_filters=default, field=default,
filters=default, prefer_local=default, text=default,
tokenizer=default, token_filters=default):
tokenizer=default, token_filters=default,
# Es 5.x params
filter=default, token_filter=default, char_filter=default):
"""Run analyze tool.

Perform the analysis process on a text and return the tokens
breakdown of the text.

"""
params = {}
duplicate_err = "Either {0}s or {0} must be set, not both".format
if filters is not default and filter is not default:
raise ValueError(duplicate_err('filter'))
if char_filters is not default and char_filter is not default:
raise ValueError(duplicate_err('char_filter'))
if token_filters is not default and token_filter is not default:
raise ValueError(duplicate_err('token_filter'))
if analyzer is not default:
params['analyzer'] = analyzer
if char_filters is not default:
Expand All @@ -37,6 +46,12 @@ def analyze(self, index=None, body=None, *,
params['tokenizer'] = tokenizer
if token_filters is not default:
params['token_filters'] = token_filters
if filter is not default:
params['filter'] = filter
if char_filter is not default:
params['char_filter'] = char_filter
if token_filter is not default:
params['token_filter'] = token_filter

_, data = yield from self.transport.perform_request(
'GET',
Expand Down Expand Up @@ -970,7 +985,7 @@ def stats(self, index=None, *, metric=default,
"'segments', 'store', 'warmer'")

_, data = yield from self.transport.perform_request(
'GET', _make_path(index, '_stats', metric),
'GET', _make_path(index, '_stats'),
params=params)
return data

Expand Down Expand Up @@ -1046,6 +1061,38 @@ def optimize(self, index=None, *,
'POST', _make_path(index, '_optimize'), params=params)
return data

@asyncio.coroutine
def force_merge(self, index=None, *,
flush=default, allow_no_indices=default,
expand_wildcards=default,
ignore_unavailable=default,
max_num_segments=default,
only_expunge_deletes=default):
"""Force merging one or more indices through an API."""
params = {}
if flush is not default:
params['flush'] = bool(flush)
if max_num_segments is not default:
params['max_num_segments'] = int(max_num_segments)
if only_expunge_deletes is not default:
params['only_expunge_deletes'] = bool(only_expunge_deletes)
if allow_no_indices is not default:
params['allow_no_indices'] = bool(allow_no_indices)
if expand_wildcards is not default:
if not isinstance(expand_wildcards, str):
raise TypeError("'expand_wildcards' parameter is not a string")
elif expand_wildcards.lower() in ('open', 'closed'):
params['expand_wildcards'] = expand_wildcards.lower()
else:
raise ValueError("'expand_wildcards' parameter should be one"
" of 'open', 'closed'")
if ignore_unavailable is not default:
params['ignore_unavailable'] = bool(ignore_unavailable)

_, data = yield from self.transport.perform_request(
'POST', _make_path(index, '_forcemerge'), params=params)
return data

@asyncio.coroutine
def validate_query(self, index=None, doc_type=None, body=None, *,
explain=default, allow_no_indices=default,
Expand Down
5 changes: 4 additions & 1 deletion aioes/client/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def stats(self, node_id=None, metric=None, index_metric=None, *,

@asyncio.coroutine
def hot_threads(self, node_id=None, *, type_=default, interval=default,
snapshots=default, threads=default):
snapshots=default, threads=default,
ignore_idle_threads=default):
"""
An API allowing to get the current hot threads on each node
in the cluster.
Expand All @@ -148,6 +149,8 @@ def hot_threads(self, node_id=None, *, type_=default, interval=default,
params['snapshots'] = snapshots
if threads is not default:
params['threads'] = threads
if ignore_idle_threads is not default:
params['ignore_idle_threads'] = str(ignore_idle_threads).lower()

_, data = yield from self.transport.perform_request(
'GET', _make_path('_nodes', node_id, 'hot_threads'),
Expand Down
7 changes: 6 additions & 1 deletion aioes/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ def select(self, connections):


class RandomSelector(AbstractSelector):
random = random

def __init__(self, seed=None):
if seed is not None:
self.random = random.Random(seed)

def select(self, connections):
return random.choice(connections)
return self.random.choice(connections)


class RoundRobinSelector(AbstractSelector):
Expand Down
10 changes: 7 additions & 3 deletions aioes/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def sniff_endpoints(self):
_, headers, node_info = yield from asyncio.wait_for(
c.perform_request(
'GET',
'/_nodes/_all/clear',
'/_nodes/_all/http',
None,
None),
timeout=self._sniffer_timeout,
Expand All @@ -208,9 +208,13 @@ def sniff_endpoints(self):
raise

endpoints = []
address = 'http_address'
for n in node_info['nodes'].values():
match = self.ADDRESS_RE.search(n.get(address, ''))
# try several fields
http_addr_1 = n.get('http_address', '')
http_addr_2 = n.get('http', {}).get('publish_address', '')
match = self.ADDRESS_RE.search(http_addr_1)
if not match:
match = self.ADDRESS_RE.search(http_addr_2)
if not match:
continue

Expand Down
Loading