Skip to content

Commit

Permalink
Using DruidExceptions in MSQ (changes related to the Broker) (#14534) (
Browse files Browse the repository at this point in the history
…#14586)

MSQ engine returns correct error codes for invalid user inputs in the query context. Also, using DruidExceptions for MSQ related errors happening in the Broker with improved error messages.
  • Loading branch information
LakshSingla authored Jul 17, 2023
1 parent 6576819 commit d7dd4cb
Show file tree
Hide file tree
Showing 19 changed files with 481 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -48,7 +48,6 @@
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
Expand Down Expand Up @@ -76,10 +75,6 @@

public class MSQTaskQueryMaker implements QueryMaker
{

private static final String DESTINATION_DATASOURCE = "dataSource";
private static final String DESTINATION_REPORT = "taskReport";

public static final String USER_KEY = "__user";

private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL;
Expand Down Expand Up @@ -128,25 +123,31 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext);
}

final String ctxDestination =
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext));

Object segmentGranularity;
try {
segmentGranularity = Optional.ofNullable(plannerContext.queryContext()
.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY))
.orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY));
}
catch (JsonProcessingException e) {
throw new IAE("Unable to deserialize the insert granularity. Please retry the query with a valid "
+ "segment graularity");
// This would only be thrown if we are unable to serialize the DEFAULT_SEGMENT_GRANULARITY, which we don't expect
// to happen
throw DruidException.defensive()
.build(
e,
"Unable to deserialize the DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. "
+ "This shouldn't have happened since the DEFAULT_SEGMENT_GRANULARITY object is guaranteed to be "
+ "serializable. Please raise an issue in case you are seeing this message while executing a query."
);
}

final int maxNumTasks = MultiStageQueryContext.getMaxNumTasks(sqlQueryContext);

if (maxNumTasks < 2) {
throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS
+ " cannot be less than 2 since at least 1 controller and 1 worker is necessary.");
throw InvalidInput.exception(
"MSQ context maxNumTasks [%,d] cannot be less than 2, since at least 1 controller and 1 worker is necessary",
maxNumTasks
);
}

// This parameter is used internally for the number of worker tasks only, so we subtract 1
Expand Down Expand Up @@ -202,16 +203,19 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
final MSQDestination destination;

if (targetDataSource != null) {
if (ctxDestination != null && !DESTINATION_DATASOURCE.equals(ctxDestination)) {
throw new IAE("Cannot INSERT with destination [%s]", ctxDestination);
}

Granularity segmentGranularityObject;
try {
segmentGranularityObject = jsonMapper.readValue((String) segmentGranularity, Granularity.class);
}
catch (Exception e) {
throw new ISE("Unable to convert %s to a segment granularity", segmentGranularity);
throw DruidException.defensive()
.build(
e,
"Unable to deserialize the provided segmentGranularity [%s]. "
+ "This is populated internally by Druid and therefore should not occur. "
+ "Please contact the developers if you are seeing this error message.",
segmentGranularity
);
}

final List<String> segmentSortOrder = MultiStageQueryContext.getSortOrder(sqlQueryContext);
Expand All @@ -228,16 +232,19 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
replaceTimeChunks
);
} else {
if (ctxDestination != null && !DESTINATION_REPORT.equals(ctxDestination)) {
throw new IAE("Cannot SELECT with destination [%s]", ctxDestination);
}
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
destination = TaskReportMSQDestination.instance();
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
destination = DurableStorageMSQDestination.instance();
} else {
throw new IAE("Cannot SELECT with destination [%s]", msqSelectDestination.name());
throw InvalidInput.exception(
"Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to "
+ "[%s] and [%s]",
msqSelectDestination.name(),
MSQSelectDestination.TASK_REPORT.toString(),
MSQSelectDestination.DURABLE_STORAGE.toString()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
Expand All @@ -60,7 +58,6 @@ public class MSQTaskSqlEngine implements SqlEngine
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
.add(MultiStageQueryContext.CTX_DESTINATION)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.build();

Expand Down Expand Up @@ -125,17 +122,17 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case SCAN_NEEDS_SIGNATURE:
return true;
default:
throw new IAE("Unrecognized feature: %s", feature);
throw SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(), feature);
}
}

@Override
public QueryMaker buildQueryMakerForSelect(
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateSelect(relRoot.fields, plannerContext);
validateSelect(plannerContext);

return new MSQTaskQueryMaker(
null,
Expand All @@ -156,7 +153,7 @@ public QueryMaker buildQueryMakerForInsert(
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateInsert(relRoot.rel, relRoot.fields, plannerContext);

Expand All @@ -169,23 +166,31 @@ public QueryMaker buildQueryMakerForInsert(
);
}

private static void validateSelect(
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
) throws ValidationException
/**
* Checks if the SELECT contains {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
* defensive cheeck because {@link org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
* {@link #validateContext}
*/
private static void validateSelect(final PlannerContext plannerContext)
{
if (plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)) {
throw new ValidationException(
StringUtils.format("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
);
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
"The SELECT query's context contains invalid parameter [%s] which is supposed to be populated "
+ "by Druid for INSERT queries. If the user is seeing this exception, that means there's a bug in Druid "
+ "that is populating the query context with the segment's granularity.",
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);
}
}

private static void validateInsert(
final RelNode rootRel,
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateNoDuplicateAliases(fieldMappings);

Expand All @@ -199,12 +204,10 @@ private static void validateInsert(
// Validate the __time field has the proper type.
final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName();
if (timeType != SqlTypeName.TIMESTAMP) {
throw new ValidationException(
StringUtils.format(
"Field \"%s\" must be of type TIMESTAMP (was %s)",
ColumnHolder.TIME_COLUMN_NAME,
timeType
)
throw InvalidSqlInput.exception(
"Field [%s] was the wrong type [%s], expected TIMESTAMP",
ColumnHolder.TIME_COLUMN_NAME,
timeType
);
}
}
Expand All @@ -220,13 +223,18 @@ private static void validateInsert(
);
}
catch (Exception e) {
throw new ValidationException(
StringUtils.format(
"Invalid segmentGranularity: %s",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
),
e
);
// This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is
// populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching
// here
throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
e,
"[%s] is not a valid value for [%s]",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY),
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);

}

final boolean hasSegmentGranularity = !Granularities.ALL.equals(segmentGranularity);
Expand All @@ -237,11 +245,10 @@ private static void validateInsert(
validateLimitAndOffset(rootRel, !hasSegmentGranularity);

if (hasSegmentGranularity && timeFieldIndex < 0) {
throw new ValidationException(
StringUtils.format(
"INSERT queries with segment granularity other than \"all\" must have a \"%s\" field.",
ColumnHolder.TIME_COLUMN_NAME
)
throw InvalidInput.exception(
"The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL. "
+ "Therefore, the query must specify a time column (named __time).",
segmentGranularity
);
}
}
Expand Down
Loading

0 comments on commit d7dd4cb

Please sign in to comment.