Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataFrame.es_query() to modify Elasticsearch queries directly #156

Merged
merged 2 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions .ci/certs/ca.crt

This file was deleted.

20 changes: 0 additions & 20 deletions .ci/certs/ca.pem

This file was deleted.

20 changes: 0 additions & 20 deletions .ci/certs/testnode_san.crt

This file was deleted.

27 changes: 0 additions & 27 deletions .ci/certs/testnode_san.key

This file was deleted.

33 changes: 5 additions & 28 deletions .ci/run-elasticsearch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ CLUSTER_NAME=${CLUSTER_NAME-${moniker}${suffix}}
HTTP_PORT=${HTTP_PORT-9200}

ELASTIC_PASSWORD=${ELASTIC_PASSWORD-changeme}
SSL_CERT=${SSL_CERT-"${SCRIPT_PATH}/certs/testnode_san.crt"}
SSL_KEY=${SSL_KEY-"${SCRIPT_PATH}/certs/testnode_san.key"}
SSL_CA=${SSL_CA-"${SCRIPT_PATH}/certs/ca.crt"}
SSL_CA_PEM=${SSL_CA-"${SCRIPT_PATH}/certs/ca.pem"}

DETACH=${DETACH-false}
CLEANUP=${CLEANUP-false}
Expand Down Expand Up @@ -122,35 +118,16 @@ if [[ "$ELASTICSEARCH_VERSION" != *oss* ]]; then
environment+=($(cat <<-END
--env ELASTIC_PASSWORD=$ELASTIC_PASSWORD
--env xpack.license.self_generated.type=trial
--env xpack.security.enabled=true
--env xpack.security.http.ssl.enabled=true
--env xpack.security.http.ssl.verification_mode=certificate
--env xpack.security.http.ssl.key=certs/testnode_san.key
--env xpack.security.http.ssl.certificate=certs/testnode_san.crt
--env xpack.security.http.ssl.certificate_authorities=certs/ca.crt
--env xpack.security.transport.ssl.enabled=true
--env xpack.security.transport.ssl.key=certs/testnode_san.key
--env xpack.security.transport.ssl.certificate=certs/testnode_san.crt
--env xpack.security.transport.ssl.certificate_authorities=certs/ca.crt
END
))
volumes+=($(cat <<-END
--volume $SSL_CERT:/usr/share/elasticsearch/config/certs/testnode_san.crt
--volume $SSL_KEY:/usr/share/elasticsearch/config/certs/testnode_san.key
--volume $SSL_CA:/usr/share/elasticsearch/config/certs/ca.crt
--volume $SSL_CA_PEM:/usr/share/elasticsearch/config/certs/ca.pem
--env xpack.security.enabled=false
--env xpack.security.http.ssl.enabled=false
--env xpack.security.transport.ssl.enabled=false
END
))
fi

url="http://$NODE_NAME"
if [[ "$ELASTICSEARCH_VERSION" != *oss* ]]; then
url="https://elastic:$ELASTIC_PASSWORD@$NODE_NAME"
fi

cert_validation_flags="--insecure"
if [[ "$NODE_NAME" == "es1" ]]; then
cert_validation_flags="--cacert /usr/share/elasticsearch/config/certs/ca.pem --resolve ${NODE_NAME}:443:127.0.0.1"
url="http://elastic:$ELASTIC_PASSWORD@$NODE_NAME"
fi

echo -e "\033[34;1mINFO:\033[0m Starting container $NODE_NAME \033[0m"
Expand All @@ -165,7 +142,7 @@ docker run \
--ulimit nofile=65536:65536 \
--ulimit memlock=-1:-1 \
--detach="$DETACH" \
--health-cmd="curl $cert_validation_flags --fail $url:9200/_cluster/health || exit 1" \
--health-cmd="curl --insecure --fail $url:9200/_cluster/health || exit 1" \
--health-interval=2s \
--health-retries=20 \
--health-timeout=2s \
Expand Down
8 changes: 4 additions & 4 deletions .ci/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ set -euxo pipefail


TEST_SUITE=${TEST_SUITE-xpack}
NODE_NAME=es1
NODE_NAME=localhost


elasticsearch_image=elasticsearch
elasticsearch_url=https://elastic:changeme@${NODE_NAME}:9200
elasticsearch_url=http://elastic:changeme@${NODE_NAME}:9200
if [[ $TEST_SUITE != "xpack" ]]; then
elasticsearch_image=elasticsearch-${TEST_SUITE}
elasticsearch_url=http://${NODE_NAME}:9200
Expand Down Expand Up @@ -44,14 +44,14 @@ echo -e "\033[1m>>>>> Start [$ELASTICSEARCH_VERSION container] >>>>>>>>>>>>>>>>>

ELASTICSEARCH_VERSION=${elasticsearch_image}:${ELASTICSEARCH_VERSION} \
NODE_NAME=${NODE_NAME} \
NETWORK_NAME=elasticsearch \
NETWORK_NAME=host \
DETACH=true \
bash .ci/run-elasticsearch.sh

echo -e "\033[1m>>>>> Repository specific tests >>>>>>>>>>>>>>>>>>>>>>>>>>>>>\033[0m"

ELASTICSEARCH_CONTAINER=${elasticsearch_image}:${ELASTICSEARCH_VERSION} \
NETWORK_NAME=elasticsearch \
NETWORK_NAME=host \
NODE_NAME=${NODE_NAME} \
ELASTICSEARCH_URL=${elasticsearch_url} \
TEST_SUITE=${TEST_SUITE} \
Expand Down
6 changes: 6 additions & 0 deletions docs/source/reference/api/eland.DataFrame.es_query.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.es_query
sethmlarson marked this conversation as resolved.
Show resolved Hide resolved
========================

.. currentmodule:: eland

.. automethod:: DataFrame.es_query
15 changes: 8 additions & 7 deletions docs/source/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ Plotting

DataFrame.hist

Elasticsearch Functions
~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/

DataFrame.info_es
DataFrame.es_query

Serialization / IO / conversion
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
Expand All @@ -86,10 +94,3 @@ Serialization / IO / conversion
DataFrame.to_csv
DataFrame.to_html
DataFrame.to_string

Elasticsearch utilities
~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/

DataFrame.info_es
56 changes: 56 additions & 0 deletions eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,62 @@ def info_es(self):

return buf.getvalue()

def es_query(self, query):
"""Applies an Elasticsearch DSL query to the current DataFrame.

Parameters
----------
query:
Dictionary of the Elasticsearch DSL query to apply

Returns
-------
eland.DataFrame:
eland DataFrame with the query applied

Examples
--------

Apply a `geo-distance query`_ to a dataset with a geo-point column ``geoip.location``.

.. _geo-distance query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html

>>> df = ed.DataFrame('localhost', 'ecommerce', columns=['customer_first_name', 'geoip.city_name'])
>>> df.es_query({"bool": {"filter": {"geo_distance": {"distance": "1km", "geoip.location": [55.3, 25.3]}}}}).head()
customer_first_name geoip.city_name
1 Mary Dubai
9 Rabbia Al Dubai
10 Rabbia Al Dubai
22 Mary Dubai
30 Robbie Dubai
<BLANKLINE>
[5 rows x 2 columns]

If using an occurrence like ``must`` or ``filter`` you must
nest it within ``bool``:

.. code-block:: python

# Correct:
df.es_query({
"bool": {
"filter": {...}
}
})

# Incorrect, needs to be nested under 'bool':
df.es_query({
"filter": {...}
})
"""
# Unpack the {'query': ...} which some
# users may use due to documentation.
if not isinstance(query, dict):
raise TypeError("'query' must be of type 'dict'")
if tuple(query) == ("query",):
query = query["query"]
return DataFrame(query_compiler=self._query_compiler.es_query(query))

def _index_summary(self):
# Print index summary e.g.
# Index: 103 entries, 0 to 102
Expand Down
6 changes: 6 additions & 0 deletions eland/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,9 @@ def __init__(self, inline, lang=None, params=None):
if params is not None:
script["params"] = params
self._filter = {"script": {"script": script}}


class QueryFilter(BooleanFilter):
def __init__(self, query):
super().__init__()
self._filter = query
21 changes: 7 additions & 14 deletions eland/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,20 @@ def hist_aggs(self, name, field, min_aggs, max_aggs, num_bins):
self._aggs[name] = agg

def to_search_body(self):
if self._query.empty():
if self._aggs:
body = {"aggs": self._aggs}
else:
body = {}
else:
if self._aggs:
body = {"query": self._query.build(), "aggs": self._aggs}
else:
body = {"query": self._query.build()}
body = {}
if self._aggs:
body["aggs"] = self._aggs
if not self._query.empty():
body["query"] = self._query.build()
return body

def to_count_body(self):
if len(self._aggs) > 0:
warnings.warn("Requesting count for agg query {}", self)
if self._query.empty():
body = None
return None
else:
body = {"query": self._query.build()}

return body
return {"query": self._query.build()}

def update_boolean_filter(self, boolean_filter):
if self._query.empty():
Expand Down
4 changes: 4 additions & 0 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from eland import FieldMappings
from eland import Index
from eland import Operations
from eland.filter import QueryFilter


class QueryCompiler:
Expand Down Expand Up @@ -397,6 +398,9 @@ def tail(self, n):

return result

def es_query(self, query):
return self._update_query(QueryFilter(query))

# To/From Pandas
def to_pandas(self, show_progress=False):
"""Converts Eland DataFrame to Pandas DataFrame.
Expand Down
11 changes: 1 addition & 10 deletions eland/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,8 @@
# Define client to use in tests
TEST_SUITE = os.environ.get("TEST_SUITE", "xpack")
if TEST_SUITE == "xpack":
print("Running xpack tests requires SSL. Setting up SSL enabled client")
certpath = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../.ci/certs/ca.crt")
)
print(certpath)
ES_TEST_CLIENT = Elasticsearch(
ELASTICSEARCH_HOST,
http_auth=("elastic", "changeme"),
use_ssl=True,
verify_certs=True,
ca_certs=certpath,
ELASTICSEARCH_HOST, http_auth=("elastic", "changeme"),
)
else:
ES_TEST_CLIENT = Elasticsearch(ELASTICSEARCH_HOST)
Expand Down
Loading