Skip to content

Commit

Permalink
Perform cleanup on job timeouts (#4681)
Browse files Browse the repository at this point in the history
* move repeated query cancellation error messages to the job serializer

* oerform cleanup on JobTimeoutException and DRY query cancellation exception blocks

* import JobTimeoutException directly from rq

* fix syntax error introduced by mistake

* add missing import
  • Loading branch information
Omer Lachish authored Feb 26, 2020
1 parent 3102e2d commit 9790b07
Show file tree
Hide file tree
Showing 33 changed files with 186 additions and 253 deletions.
2 changes: 2 additions & 0 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from redash import settings
from redash.utils import json_loads
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)

__all__ = [
"BaseQueryRunner",
"BaseHTTPQueryRunner",
"InterruptException",
"JobTimeoutException",
"BaseSQLQueryRunner",
"TYPE_DATETIME",
"TYPE_BOOLEAN",
Expand Down
10 changes: 2 additions & 8 deletions redash/query_runner/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,10 @@ def run_query(self, query, user):
}
json_data = json_dumps(data, ignore_nan=True)
error = None
except (KeyboardInterrupt, InterruptException):
except Exception:
if cursor.query_id:
cursor.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as ex:
if cursor.query_id:
cursor.cancel()
error = str(ex)
json_data = None
raise

return json_data, error

Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/axibase_tsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ def run_query(self, query, user):
except SQLException as e:
json_data = None
error = e.content
except (KeyboardInterrupt, InterruptException):
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
sql.cancel_query(query_id)
error = "Query cancelled by user."
json_data = None
raise

return json_data, error

Expand Down
3 changes: 0 additions & 3 deletions redash/query_runner/azure_kusto.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ def run_query(self, query, user):
error = err.args[1][0]["error"]["@message"]
except (IndexError, KeyError):
error = err.args[1]
except KeyboardInterrupt:
json_data = None
error = "Query cancelled by user."

return json_data, error

Expand Down
3 changes: 0 additions & 3 deletions redash/query_runner/big_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,6 @@ def run_query(self, query, user):
error = json_loads(e.content)["error"]["message"]
else:
error = e.content
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None

return json_data, error

Expand Down
77 changes: 36 additions & 41 deletions redash/query_runner/cass.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,47 +93,42 @@ def get_schema(self, get_stats=False):

def run_query(self, query, user):
connection = None
try:
if self.configuration.get("username", "") and self.configuration.get(
"password", ""
):
auth_provider = PlainTextAuthProvider(
username="{}".format(self.configuration.get("username", "")),
password="{}".format(self.configuration.get("password", "")),
)
connection = Cluster(
[self.configuration.get("host", "")],
auth_provider=auth_provider,
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
else:
connection = Cluster(
[self.configuration.get("host", "")],
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
session = connection.connect()
session.set_keyspace(self.configuration["keyspace"])
session.default_timeout = self.configuration.get("timeout", 10)
logger.debug("Cassandra running query: %s", query)
result = session.execute(query)

column_names = result.column_names

columns = self.fetch_columns([(c, "string") for c in column_names])

rows = [dict(zip(column_names, row)) for row in result]

data = {"columns": columns, "rows": rows}
json_data = json_dumps(data, cls=CassandraJSONEncoder)

error = None
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None

return json_data, error

if self.configuration.get("username", "") and self.configuration.get(
"password", ""
):
auth_provider = PlainTextAuthProvider(
username="{}".format(self.configuration.get("username", "")),
password="{}".format(self.configuration.get("password", "")),
)
connection = Cluster(
[self.configuration.get("host", "")],
auth_provider=auth_provider,
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
else:
connection = Cluster(
[self.configuration.get("host", "")],
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
session = connection.connect()
session.set_keyspace(self.configuration["keyspace"])
session.default_timeout = self.configuration.get("timeout", 10)
logger.debug("Cassandra running query: %s", query)
result = session.execute(query)

column_names = result.column_names

columns = self.fetch_columns([(c, "string") for c in column_names])

rows = [dict(zip(column_names, row)) for row in result]

data = {"columns": columns, "rows": rows}
json_data = json_dumps(data, cls=CassandraJSONEncoder)

return json_data, None


class ScyllaDB(Cassandra):
Expand Down
11 changes: 4 additions & 7 deletions redash/query_runner/couchbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ def call_service(self, query, user):
raise Exception("Couchbase connection error")

def run_query(self, query, user):
try:
result = self.call_service(query, user)
result = self.call_service(query, user)

rows, columns = parse_results(result.json()["results"])
data = {"columns": columns, "rows": rows}
rows, columns = parse_results(result.json()["results"])
data = {"columns": columns, "rows": rows}

return json_dumps(data), None
except KeyboardInterrupt:
return None, "Query cancelled by user."
return json_dumps(data), None

@classmethod
def name(cls):
Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ def run_query(self, query, user):
except ibm_db_dbi.DatabaseError as e:
error = str(e)
json_data = None
except (KeyboardInterrupt, InterruptException):
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
raise
finally:
connection.close()

Expand Down
19 changes: 8 additions & 11 deletions redash/query_runner/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,17 @@ def configuration_schema(cls):
def run_query(self, query, user):
drill_url = os.path.join(self.configuration["url"], "query.json")

try:
payload = {"queryType": "SQL", "query": query}
payload = {"queryType": "SQL", "query": query}

response, error = self.get_response(
drill_url, http_method="post", json=payload
)
if error is not None:
return None, error
response, error = self.get_response(
drill_url, http_method="post", json=payload
)
if error is not None:
return None, error

results = parse_response(response.json())
results = parse_response(response.json())

return json_dumps(results), None
except KeyboardInterrupt:
return None, "Query cancelled by user."
return json_dumps(results), None

def get_schema(self, get_stats=False):

Expand Down
8 changes: 2 additions & 6 deletions redash/query_runner/dynamodb_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,10 @@ def run_query(self, query, user):
e.lineno, e.column, e.line
)
json_data = None
except (SyntaxError, RuntimeError) as e:
error = str(e)
json_data = None
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
if engine and engine.connection:
engine.connection.cancel()
error = "Query cancelled by user."
json_data = None
raise

return json_data, error

Expand Down
8 changes: 2 additions & 6 deletions redash/query_runner/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,6 @@ def run_query(self, query, user):
raise Exception("Advanced queries are not supported")

json_data = json_dumps({"columns": result_columns, "rows": result_rows})
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
Expand Down Expand Up @@ -497,10 +494,9 @@ def run_query(self, query, user):
)

json_data = json_dumps({"columns": result_columns, "rows": result_rows})
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
logger.exception(e)
error = "Query cancelled by user."
json_data = None
raise
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/hive_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,10 @@ def run_query(self, query, user):
data = {"columns": columns, "rows": rows}
json_data = json_dumps(data)
error = None
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
if connection:
connection.cancel()
error = "Query cancelled by user."
json_data = None
raise
except DatabaseError as e:
try:
error = e.args[0].status.errorMessage
Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/impala_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ def run_query(self, query, user):
except RPCError as e:
json_data = None
error = "Metastore Error [%s]" % str(e)
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
raise
finally:
if connection:
connection.close()
Expand Down
59 changes: 28 additions & 31 deletions redash/query_runner/jql.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,44 +166,41 @@ def __init__(self, configuration):
def run_query(self, query, user):
jql_url = "{}/rest/api/2/search".format(self.configuration["url"])

try:
query = json_loads(query)
query_type = query.pop("queryType", "select")
field_mapping = FieldMapping(query.pop("fieldMapping", {}))

if query_type == "count":
query["maxResults"] = 1
query["fields"] = ""
else:
query["maxResults"] = query.get("maxResults", 1000)
query = json_loads(query)
query_type = query.pop("queryType", "select")
field_mapping = FieldMapping(query.pop("fieldMapping", {}))

response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error
if query_type == "count":
query["maxResults"] = 1
query["fields"] = ""
else:
query["maxResults"] = query.get("maxResults", 1000)

data = response.json()
response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error

if query_type == "count":
results = parse_count(data)
else:
results = parse_issues(data, field_mapping)
index = data["startAt"] + data["maxResults"]
data = response.json()

while data["total"] > index:
query["startAt"] = index
response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error
if query_type == "count":
results = parse_count(data)
else:
results = parse_issues(data, field_mapping)
index = data["startAt"] + data["maxResults"]

while data["total"] > index:
query["startAt"] = index
response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error

data = response.json()
index = data["startAt"] + data["maxResults"]
data = response.json()
index = data["startAt"] + data["maxResults"]

addl_results = parse_issues(data, field_mapping)
results.merge(addl_results)
addl_results = parse_issues(data, field_mapping)
results.merge(addl_results)

return results.to_json(), None
except KeyboardInterrupt:
return None, "Query cancelled by user."
return results.to_json(), None


register(JiraJQL)
Loading

0 comments on commit 9790b07

Please sign in to comment.