Skip to content

Commit

Permalink
[enhencement](trino-connector) trino-connector supports push down pro…
Browse files Browse the repository at this point in the history
…jection to connectors (#37874)

Invoke the `applyProjection` method of connectorMetadata` to push the
projection down to the connector. This reduces the amount of data
retrieved by the connector and enhances query performance.

Projection pushdown is particularly important for the BigQuery
connector.
  • Loading branch information
BePPPower authored and dataroaring committed Jul 31, 2024
1 parent c9d873d commit 9fbc89b
Showing 1 changed file with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
Expand All @@ -66,6 +67,9 @@
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import io.trino.split.BufferingSplitSource;
Expand Down Expand Up @@ -177,29 +181,22 @@ private void applyPushDown(ConnectorSession connectorSession) {
+ " after pushing down.");
}

// TODO(ftw): push down projection
// Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap();
// Map<String, ColumnHandle> assignments = Maps.newHashMap();
// if (source.getTargetTable().getName().equals("customer")) {
// assignments.put("c_custkey", columnHandleMap.get("c_custkey"));
// assignments.put("c_mktsegment", columnHandleMap.get("c_mktsegment"));
// } else if (source.getTargetTable().getName().equals("orders")) {
// assignments.put("o_orderkey", columnHandleMap.get("o_orderkey"));
// assignments.put("o_custkey", columnHandleMap.get("o_custkey"));
// assignments.put("o_orderdate", columnHandleMap.get("o_orderdate"));
// assignments.put("o_shippriority", columnHandleMap.get("o_shippriority"));
// } else if (source.getTargetTable().getName().equals("lineitem")) {
// assignments.put("l_orderkey", columnHandleMap.get("l_orderkey"));
// assignments.put("l_extendedprice", columnHandleMap.get("l_extendedprice"));
// assignments.put("l_discount", columnHandleMap.get("l_discount"));
// assignments.put("l_shipdate", columnHandleMap.get("l_shipdate"));
// }
// Optional<ProjectionApplicationResult<ConnectorTableHandle>> projectionResult
// = connectorMetadata.applyProjection(connectorSession, source.getTrinoConnectorTableHandle(),
// Lists.newArrayList(), assignments);
// if (projectionResult.isPresent()) {
// source.setTrinoConnectorTableHandle(projectionResult.get().getHandle());
// }
// push down projection
Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap();
Map<String, ColumnMetadata> columnMetadataMap = source.getTargetTable().getColumnMetadataMap();
Map<String, ColumnHandle> assignments = Maps.newLinkedHashMap();
List<ConnectorExpression> projections = Lists.newArrayList();
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
String colName = slotDescriptor.getColumn().getName();
assignments.put(colName, columnHandleMap.get(colName));
projections.add(new Variable(colName, columnMetadataMap.get(colName).getType()));
}
Optional<ProjectionApplicationResult<ConnectorTableHandle>> projectionResult
= connectorMetadata.applyProjection(connectorSession, source.getTrinoConnectorTableHandle(),
projections, assignments);
if (projectionResult.isPresent()) {
source.setTrinoConnectorTableHandle(projectionResult.get().getHandle());
}
}

private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorTableHandle table,
Expand Down

0 comments on commit 9fbc89b

Please sign in to comment.