From 11727af2a6c464b57047b1a97ff5053daa6fb0dd Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala Date: Thu, 19 Sep 2024 09:02:12 +0530 Subject: [PATCH] Fix String Frame Readers to read String Arrays correctly (#16885) (#17103) While writing to a frame, String arrays are written by setting the multivalue byte. But while reading, it was hardcoded to false. (cherry picked from commit c7c3307e6193db8ddc879f48bbf3b9e3d1b41a6c) --- .../druid/frame/field/StringFieldReader.java | 5 +- .../read/columnar/FrameColumnReaders.java | 4 +- .../StringArrayFrameColumnReader.java | 385 ++++++++++++++++ .../columnar/StringFrameColumnReader.java | 430 +++++++----------- .../columnar/StringFrameColumnWriter.java | 5 +- .../operator/window/RowsAndColumnsHelper.java | 12 + ...t.java => EvaluateRowsAndColumnsTest.java} | 36 +- 7 files changed, 603 insertions(+), 274 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java rename processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/{TestVirtualColumnEvaluationRowsAndColumnsTest.java => EvaluateRowsAndColumnsTest.java} (73%) diff --git a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java index 1c51a914e0d5..be2263be138d 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java @@ -480,9 +480,8 @@ public int numRows() public boolean isNull(int rowNum) { final long fieldPosition = coach.computeFieldPosition(rowNum); - byte[] nullBytes = new byte[3]; - dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3); - return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL); + return dataRegion.getByte(fieldPosition) == StringFieldWriter.NULL_ROW + && dataRegion.getByte(fieldPosition + 1) == StringFieldWriter.ROW_TERMINATOR; } @Override diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java index 9b4dc85cb1e0..6ceb6b729176 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java @@ -51,7 +51,7 @@ public static FrameColumnReader create( return new DoubleFrameColumnReader(columnNumber); case STRING: - return new StringFrameColumnReader(columnNumber, false); + return new StringFrameColumnReader(columnNumber); case COMPLEX: return new ComplexFrameColumnReader(columnNumber); @@ -59,7 +59,7 @@ public static FrameColumnReader create( case ARRAY: switch (columnType.getElementType().getType()) { case STRING: - return new StringFrameColumnReader(columnNumber, true); + return new StringArrayFrameColumnReader(columnNumber); case LONG: return new LongArrayFrameColumnReader(columnNumber); case FLOAT: diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java new file mode 100644 index 000000000000..31c56bf38e7d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.objects.ObjectArrays; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReaderUtils; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.columnar.FrameColumnWriters; +import org.apache.druid.frame.write.columnar.StringFrameColumnWriter; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; +import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.ObjectColumnSelector; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.ReadableOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.ReadableVectorOffset; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * Reader for {@link ColumnType#STRING_ARRAY}. + * This is similar to {@link StringFrameColumnReader} reading mvds in reading bytes from frame + */ +public class StringArrayFrameColumnReader implements FrameColumnReader +{ + private final int columnNumber; + + /** + * Create a new reader. + * + * @param columnNumber column number + */ + StringArrayFrameColumnReader(int columnNumber) + { + this.columnNumber = columnNumber; + } + + @Override + public Column readRACColumn(Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + + final long positionOfLengths = getStartOfStringLengthSection(frame.numRows()); + final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows()); + + StringArrayFrameColumn frameCol = new StringArrayFrameColumn( + frame, + memory, + positionOfLengths, + positionOfPayloads + ); + + return new ColumnAccessorBasedColumn(frameCol); + } + + @Override + public ColumnPlus readColumn(final Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + + final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows()); + final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows()); + + final BaseColumn baseColumn = new StringArrayFrameColumn( + frame, + memory, + startOfStringLengthSection, + startOfStringDataSection + ); + + return new ColumnPlus( + baseColumn, + new ColumnCapabilitiesImpl().setType(ColumnType.STRING_ARRAY) + .setHasMultipleValues(false) + .setDictionaryEncoded(false), + frame.numRows() + ); + } + + private void validate(final Memory region) + { + // Check if column is big enough for a header + if (region.getCapacity() < StringFrameColumnWriter.DATA_OFFSET) { + throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber); + } + + final byte typeCode = region.getByte(0); + if (typeCode != FrameColumnWriters.TYPE_STRING_ARRAY) { + throw DruidException.defensive( + "Column[%s] does not have the correct type code; expected[%s], got[%s]", + columnNumber, + FrameColumnWriters.TYPE_STRING_ARRAY, + typeCode + ); + } + } + + private static long getStartOfCumulativeLengthSection() + { + return StringFrameColumnWriter.DATA_OFFSET; + } + + private static long getStartOfStringLengthSection(final int numRows) + { + return StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * numRows; + } + + private long getStartOfStringDataSection( + final Memory memory, + final int numRows + ) + { + if (numRows < 0) { + throw DruidException.defensive("Encountered -ve numRows [%s] while reading frame", numRows); + } + final int totalNumValues = FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + numRows - 1 + ); + + return getStartOfStringLengthSection(numRows) + (long) Integer.BYTES * totalNumValues; + } + + private static class StringArrayFrameColumn extends ObjectColumnAccessorBase implements BaseColumn + { + private final Frame frame; + private final Memory memory; + private final long startOfStringLengthSection; + private final long startOfStringDataSection; + + private StringArrayFrameColumn( + Frame frame, + Memory memory, + long startOfStringLengthSection, + long startOfStringDataSection + ) + { + this.frame = frame; + this.memory = memory; + this.startOfStringLengthSection = startOfStringLengthSection; + this.startOfStringDataSection = startOfStringDataSection; + } + + @Override + public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) + { + return new ObjectColumnSelector() + { + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + + @Nullable + @Override + public Object getObject() + { + return getRowAsObject(frame.physicalRow(offset.getOffset()), true); + } + + @Override + public Class classOfObject() + { + return Object[].class; + } + }; + } + + @Override + public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset) + { + class StringArrayFrameVectorObjectSelector implements VectorObjectSelector + { + private final Object[] vector = new Object[offset.getMaxVectorSize()]; + private int id = ReadableVectorInspector.NULL_ID; + + @Override + public Object[] getObjectVector() + { + computeVectorIfNeeded(); + return vector; + } + + @Override + public int getMaxVectorSize() + { + return offset.getMaxVectorSize(); + } + + @Override + public int getCurrentVectorSize() + { + return offset.getCurrentVectorSize(); + } + + private void computeVectorIfNeeded() + { + if (id == offset.getId()) { + return; + } + + if (offset.isContiguous()) { + final int start = offset.getStartOffset(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(i + start); + vector[i] = getRowAsObject(physicalRow, true); + } + } else { + final int[] offsets = offset.getOffsets(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(offsets[i]); + vector[i] = getRowAsObject(physicalRow, true); + } + } + + id = offset.getId(); + } + } + + return new StringArrayFrameVectorObjectSelector(); + } + + @Override + public void close() + { + // Do nothing. + } + + @Override + public ColumnType getType() + { + return ColumnType.STRING_ARRAY; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + protected Object getVal(int rowNum) + { + return getRowAsObject(frame.physicalRow(rowNum), true); + } + + @Override + protected Comparator getComparator() + { + return Comparator.nullsFirst(ColumnType.STRING_ARRAY.getStrategy()); + } + + /** + * Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly + * created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is + * not OK to write to its contents. + */ + @Nullable + private ByteBuffer getStringUtf8(final int index) + { + if (startOfStringLengthSection > Long.MAX_VALUE - (long) Integer.BYTES * index) { + throw DruidException.defensive("length index would overflow trying to read the frame memory!"); + } + + final int dataEndVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index); + if (startOfStringDataSection > Long.MAX_VALUE - dataEndVariableIndex) { + throw DruidException.defensive("data end index would overflow trying to read the frame memory!"); + } + + final long dataStart; + final long dataEnd = startOfStringDataSection + dataEndVariableIndex; + + if (index == 0) { + dataStart = startOfStringDataSection; + } else { + final int dataStartVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index + - 1)); + if (startOfStringDataSection > Long.MAX_VALUE - dataStartVariableIndex) { + throw DruidException.defensive("data start index would overflow trying to read the frame memory!"); + } + dataStart = startOfStringDataSection + dataStartVariableIndex; + } + + final int dataLength = Ints.checkedCast(dataEnd - dataStart); + + if ((dataLength == 0 && NullHandling.replaceWithDefault()) || + (dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) { + return null; + } + + return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength); + } + + @Nullable + private String getString(final int index) + { + final ByteBuffer stringUtf8 = getStringUtf8(index); + + if (stringUtf8 == null) { + return null; + } else { + return StringUtils.fromUtf8(stringUtf8); + } + } + + /** + * Returns the object at the given physical row number. + * + * @param physicalRow physical row number + * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. + */ + @Nullable + private Object getRowAsObject(final int physicalRow, final boolean decode) + { + final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow + ); + final int rowLength; + + if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) { + return null; + } else if (physicalRow == 0) { + rowLength = cumulativeRowLength; + } else { + rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow - 1 + ); + } + + if (rowLength == 0) { + return ObjectArrays.EMPTY_ARRAY; + } else { + final Object[] row = new Object[rowLength]; + + for (int i = 0; i < rowLength; i++) { + final int index = cumulativeRowLength - rowLength + i; + row[i] = decode ? getString(index) : getStringUtf8(index); + } + + return row; + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index d9fb9d83a9f4..2385c431e5b3 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -19,18 +19,16 @@ package org.apache.druid.frame.read.columnar; -import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; -import it.unimi.dsi.fastutil.objects.ObjectArrays; import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReaderUtils; import org.apache.druid.frame.write.FrameWriterUtils; import org.apache.druid.frame.write.columnar.FrameColumnWriters; import org.apache.druid.frame.write.columnar.StringFrameColumnWriter; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.DruidPredicateFactory; @@ -40,13 +38,11 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; import org.apache.druid.segment.BaseSingleValueDimensionSelector; -import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelectorUtils; import org.apache.druid.segment.IdLookup; import org.apache.druid.segment.column.BaseColumn; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.DictionaryEncodedColumn; @@ -67,23 +63,20 @@ import java.util.List; /** - * Reader for {@link StringFrameColumnWriter}, types {@link ColumnType#STRING} and {@link ColumnType#STRING_ARRAY}. + * Reader for {@link StringFrameColumnWriter}, type {@link ColumnType#STRING}. */ public class StringFrameColumnReader implements FrameColumnReader { private final int columnNumber; - private final boolean asArray; /** * Create a new reader. * * @param columnNumber column number - * @param asArray true for {@link ColumnType#STRING_ARRAY}, false for {@link ColumnType#STRING} */ - StringFrameColumnReader(int columnNumber, boolean asArray) + StringFrameColumnReader(int columnNumber) { this.columnNumber = columnNumber; - this.asArray = asArray; } @Override @@ -92,18 +85,20 @@ public Column readRACColumn(Frame frame) final Memory memory = frame.region(columnNumber); validate(memory); + if (isMultiValue(memory)) { + throw InvalidInput.exception("Encountered a multi value column. Window processing does not support MVDs. " + + "Consider using UNNEST or MV_TO_ARRAY."); + } final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false); final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false); - StringFrameColumn frameCol = - new StringFrameColumn( - frame, - false, - memory, - positionOfLengths, - positionOfPayloads, - asArray || isMultiValue(memory) // Read MVDs as String arrays - ); + StringFrameColumn frameCol = new StringFrameColumn( + frame, + false, + memory, + positionOfLengths, + positionOfPayloads + ); return new ColumnAccessorBasedColumn(frameCol); } @@ -118,35 +113,19 @@ public ColumnPlus readColumn(final Frame frame) final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows(), multiValue); final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows(), multiValue); - final BaseColumn baseColumn; - - if (asArray) { - baseColumn = new StringArrayFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection - ); - } else { - baseColumn = new StringFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection, - false - ); - } + final BaseColumn baseColumn = new StringFrameColumn( + frame, + multiValue, + memory, + startOfStringLengthSection, + startOfStringDataSection + ); return new ColumnPlus( baseColumn, - new ColumnCapabilitiesImpl().setType(asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING) - .setHasMultipleValues(!asArray && multiValue) - .setDictionaryEncoded(false) - .setHasBitmapIndexes(false) - .setHasSpatialIndexes(false) - .setHasNulls(ColumnCapabilities.Capable.UNKNOWN), + new ColumnCapabilitiesImpl().setType(ColumnType.STRING) + .setHasMultipleValues(multiValue) + .setDictionaryEncoded(false), frame.numRows() ); } @@ -159,12 +138,11 @@ private void validate(final Memory region) } final byte typeCode = region.getByte(0); - final byte expectedTypeCode = asArray ? FrameColumnWriters.TYPE_STRING_ARRAY : FrameColumnWriters.TYPE_STRING; - if (typeCode != expectedTypeCode) { + if (typeCode != FrameColumnWriters.TYPE_STRING) { throw DruidException.defensive( "Column[%s] does not have the correct type code; expected[%s], got[%s]", columnNumber, - expectedTypeCode, + FrameColumnWriters.TYPE_STRING, typeCode ); } @@ -172,7 +150,7 @@ private void validate(final Memory region) private static boolean isMultiValue(final Memory memory) { - return memory.getByte(1) == 1; + return memory.getByte(StringFrameColumnWriter.MULTI_VALUE_POSITION) == StringFrameColumnWriter.MULTI_VALUE_BYTE; } private static long getStartOfCumulativeLengthSection() @@ -213,8 +191,7 @@ private static long getStartOfStringDataSection( return getStartOfStringLengthSection(numRows, multiValue) + (long) Integer.BYTES * totalNumValues; } - @VisibleForTesting - static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn + private static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn { private final Frame frame; private final Memory memory; @@ -226,18 +203,12 @@ static class StringFrameColumn extends ObjectColumnAccessorBase implements Dicti */ private final boolean multiValue; - /** - * Whether the column is being read as {@link ColumnType#STRING_ARRAY} (true) or {@link ColumnType#STRING} (false). - */ - private final boolean asArray; - private StringFrameColumn( Frame frame, boolean multiValue, Memory memory, long startOfStringLengthSection, - long startOfStringDataSection, - final boolean asArray + long startOfStringDataSection ) { this.frame = frame; @@ -245,7 +216,6 @@ private StringFrameColumn( this.memory = memory; this.startOfStringLengthSection = startOfStringLengthSection; this.startOfStringDataSection = startOfStringDataSection; - this.asArray = asArray; } @Override @@ -293,11 +263,142 @@ public int getCardinality() @Override public DimensionSelector makeDimensionSelector(ReadableOffset offset, @Nullable ExtractionFn extractionFn) { - if (asArray) { - throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", ColumnType.STRING_ARRAY); - } + if (multiValue) { + class MultiValueSelector implements DimensionSelector + { + private int currentRow = -1; + private List currentValues = null; + private final RangeIndexedInts indexedInts = new RangeIndexedInts(); + + @Override + public int getValueCardinality() + { + return CARDINALITY_UNKNOWN; + } + + @Nullable + @Override + public String lookupName(int id) + { + populate(); + final ByteBuffer buf = currentValues.get(id); + final String s = buf == null ? null : StringUtils.fromUtf8(buf.duplicate()); + return extractionFn == null ? s : extractionFn.apply(s); + } + + @Nullable + @Override + public ByteBuffer lookupNameUtf8(int id) + { + assert supportsLookupNameUtf8(); + populate(); + return currentValues.get(id); + } + + @Override + public boolean supportsLookupNameUtf8() + { + return extractionFn == null; + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return false; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + + @Override + public IndexedInts getRow() + { + populate(); + return indexedInts; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, value); + } - return makeDimensionSelectorInternal(offset, extractionFn); + @Override + public ValueMatcher makeValueMatcher(DruidPredicateFactory predicateFactory) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicateFactory); + } + + @Nullable + @Override + public Object getObject() + { + return getRowAsObject(frame.physicalRow(offset.getOffset()), true); + } + + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + + private void populate() + { + final int row = offset.getOffset(); + + if (row != currentRow) { + currentValues = getRowAsListUtf8(frame.physicalRow(row)); + indexedInts.setSize(currentValues.size()); + currentRow = row; + } + } + } + + return new MultiValueSelector(); + } else { + class SingleValueSelector extends BaseSingleValueDimensionSelector + { + @Nullable + @Override + protected String getValue() + { + final String s = getString(frame.physicalRow(offset.getOffset())); + return extractionFn == null ? s : extractionFn.apply(s); + } + + @Nullable + @Override + public ByteBuffer lookupNameUtf8(int id) + { + assert supportsLookupNameUtf8(); + return getStringUtf8(frame.physicalRow(offset.getOffset())); + } + + @Override + public boolean supportsLookupNameUtf8() + { + return extractionFn == null; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + } + + return new SingleValueSelector(); + } } @Override @@ -385,7 +486,7 @@ public void close() @Override public ColumnType getType() { - return asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING; + return ColumnType.STRING; } @Override @@ -397,7 +498,7 @@ public int numRows() @Override protected Object getVal(int rowNum) { - return getString(frame.physicalRow(rowNum)); + return getRowAsObject(frame.physicalRow(rowNum), true); } @Override @@ -452,10 +553,6 @@ private String getString(final int index) /** * Returns the object at the given physical row number. * - * When {@link #asArray}, the return value is always of type {@code Object[]}. Otherwise, the return value - * is either an empty list (if the row is empty), a single String (if the row has one value), or a List - * of Strings (if the row has more than one value). - * * @param physicalRow physical row number * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. */ @@ -483,11 +580,11 @@ private Object getRowAsObject(final int physicalRow, final boolean decode) } if (rowLength == 0) { - return asArray ? ObjectArrays.EMPTY_ARRAY : Collections.emptyList(); + return Collections.emptyList(); } else if (rowLength == 1) { final int index = cumulativeRowLength - 1; final Object o = decode ? getString(index) : getStringUtf8(index); - return asArray ? new Object[]{o} : o; + return o; } else { final Object[] row = new Object[rowLength]; @@ -496,26 +593,21 @@ private Object getRowAsObject(final int physicalRow, final boolean decode) row[i] = decode ? getString(index) : getStringUtf8(index); } - return asArray ? row : Arrays.asList(row); + return Arrays.asList(row); } } else { final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow); - return asArray ? new Object[]{o} : o; + return o; } } /** - * Returns the value at the given physical row number as a list of ByteBuffers. Only valid when !asArray, i.e., - * when type is {@link ColumnType#STRING}. + * Returns the value at the given physical row number as a list of ByteBuffers. * * @param physicalRow physical row number */ private List getRowAsListUtf8(final int physicalRow) { - if (asArray) { - throw DruidException.defensive("Unexpected call for array column"); - } - final Object object = getRowAsObject(physicalRow, false); if (object == null) { @@ -527,185 +619,5 @@ private List getRowAsListUtf8(final int physicalRow) return Collections.singletonList((ByteBuffer) object); } } - - /** - * Selector used by this column. It's versatile: it can run as string array (asArray = true) or regular string - * column (asArray = false). - */ - private DimensionSelector makeDimensionSelectorInternal(ReadableOffset offset, @Nullable ExtractionFn extractionFn) - { - if (multiValue) { - class MultiValueSelector implements DimensionSelector - { - private int currentRow = -1; - private List currentValues = null; - private final RangeIndexedInts indexedInts = new RangeIndexedInts(); - - @Override - public int getValueCardinality() - { - return CARDINALITY_UNKNOWN; - } - - @Nullable - @Override - public String lookupName(int id) - { - populate(); - final ByteBuffer buf = currentValues.get(id); - final String s = buf == null ? null : StringUtils.fromUtf8(buf.duplicate()); - return extractionFn == null ? s : extractionFn.apply(s); - } - - @Nullable - @Override - public ByteBuffer lookupNameUtf8(int id) - { - assert supportsLookupNameUtf8(); - populate(); - return currentValues.get(id); - } - - @Override - public boolean supportsLookupNameUtf8() - { - return extractionFn == null; - } - - @Override - public boolean nameLookupPossibleInAdvance() - { - return false; - } - - @Nullable - @Override - public IdLookup idLookup() - { - return null; - } - - @Override - public IndexedInts getRow() - { - populate(); - return indexedInts; - } - - @Override - public ValueMatcher makeValueMatcher(@Nullable String value) - { - return DimensionSelectorUtils.makeValueMatcherGeneric(this, value); - } - - @Override - public ValueMatcher makeValueMatcher(DruidPredicateFactory predicateFactory) - { - return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicateFactory); - } - - @Nullable - @Override - public Object getObject() - { - return getRowAsObject(frame.physicalRow(offset.getOffset()), true); - } - - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - // Do nothing. - } - - private void populate() - { - final int row = offset.getOffset(); - - if (row != currentRow) { - currentValues = getRowAsListUtf8(frame.physicalRow(row)); - indexedInts.setSize(currentValues.size()); - currentRow = row; - } - } - } - - return new MultiValueSelector(); - } else { - class SingleValueSelector extends BaseSingleValueDimensionSelector - { - @Nullable - @Override - protected String getValue() - { - final String s = getString(frame.physicalRow(offset.getOffset())); - return extractionFn == null ? s : extractionFn.apply(s); - } - - @Nullable - @Override - public ByteBuffer lookupNameUtf8(int id) - { - assert supportsLookupNameUtf8(); - return getStringUtf8(frame.physicalRow(offset.getOffset())); - } - - @Override - public boolean supportsLookupNameUtf8() - { - return extractionFn == null; - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - // Do nothing. - } - } - - return new SingleValueSelector(); - } - } - } - - static class StringArrayFrameColumn implements BaseColumn - { - private final StringFrameColumn delegate; - - private StringArrayFrameColumn( - Frame frame, - boolean multiValue, - Memory memory, - long startOfStringLengthSection, - long startOfStringDataSection - ) - { - this.delegate = new StringFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection, - true - ); - } - - @Override - @SuppressWarnings("rawtypes") - public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) - { - return delegate.makeDimensionSelectorInternal(offset, null); - } - - @Override - public void close() - { - delegate.close(); - } } } diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java index 8eee0fd0cef9..75f64613388b 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java @@ -44,6 +44,9 @@ public abstract class StringFrameColumnWriter imp public static final long DATA_OFFSET = 1 /* type code */ + 1 /* single or multi-value? */; + public static final byte MULTI_VALUE_BYTE = (byte) 0x01; + public static final long MULTI_VALUE_POSITION = 1; + private final T selector; private final byte typeCode; protected final ColumnCapabilities.Capable multiValue; @@ -228,7 +231,7 @@ public long writeTo(final WritableMemory memory, final long startPosition) long currentPosition = startPosition; memory.putByte(currentPosition, typeCode); - memory.putByte(currentPosition + 1, writeMultiValue ? (byte) 1 : (byte) 0); + memory.putByte(currentPosition + 1, writeMultiValue ? MULTI_VALUE_BYTE : (byte) 0); currentPosition += 2; if (writeMultiValue) { diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java index 2636156b53cc..cdc84620ab0f 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; @@ -280,6 +281,17 @@ public void validate(String msgBase, Column col) } else { Assert.assertEquals(msg, ((Long) expectedVal).longValue(), accessor.getLong(i)); } + } else if (expectedVal instanceof Object[]) { + Object actualVal = accessor.getObject(i); + if (expectedNulls[i]) { + Assert.assertNull(msg, accessor.getObject(i)); + } else { + if (actualVal instanceof ArrayList) { + Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], ((ArrayList) actualVal).toArray()); + } else { + Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], (Object[]) actualVal); + } + } } else { if (expectedNulls[i]) { Assert.assertNull(msg, accessor.getObject(i)); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java similarity index 73% rename from processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java rename to processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java index 99d5dfabc89b..e2cee35a8e9a 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java @@ -38,32 +38,44 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeNotNull; -public class TestVirtualColumnEvaluationRowsAndColumnsTest extends SemanticTestBase +public class EvaluateRowsAndColumnsTest extends SemanticTestBase { - public TestVirtualColumnEvaluationRowsAndColumnsTest(String name, Function fn) + public EvaluateRowsAndColumnsTest(String name, Function fn) { super(name, fn); } @Test - public void testMaterializeVirtualColumns() + public void testMaterializeColumns() { Object[][] vals = new Object[][] { - {1L, "a", 123L, 0L}, - {2L, "a", 456L, 1L}, - {3L, "b", 789L, 2L}, - {4L, "b", 123L, 3L}, + {1L, "a", 123L, new Object[]{"xyz", "x"}, 0L}, + {2L, "a", 456L, new Object[]{"abc"}, 1L}, + {3L, "b", 789L, new Object[]{null}, 2L}, + {4L, null, 123L, null, 3L}, }; RowSignature siggy = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim", ColumnType.STRING) .add("val", ColumnType.LONG) + .add("array", ColumnType.STRING_ARRAY) .add("arrayIndex", ColumnType.LONG) .build(); final RowsAndColumns base = make(MapOfColumnsRowsAndColumns.fromRowObjects(vals, siggy)); + Object[] expectedArr = new Object[][] { + {"xyz", "x"}, + {"abc"}, + {null}, + null + }; + + new RowsAndColumnsHelper() + .expectColumn("array", expectedArr, ColumnType.STRING_ARRAY) + .validate(base); + assumeNotNull("skipping: CursorFactory not supported", base.as(CursorFactory.class)); LazilyDecoratedRowsAndColumns ras = new LazilyDecoratedRowsAndColumns( @@ -82,12 +94,18 @@ public void testMaterializeVirtualColumns() // do the materialziation ras.numRows(); - assertEquals(Lists.newArrayList("__time", "dim", "val", "arrayIndex", "expr"), ras.getColumnNames()); + assertEquals(Lists.newArrayList("__time", "dim", "val", "array", "arrayIndex", "expr"), ras.getColumnNames()); new RowsAndColumnsHelper() .expectColumn("expr", new long[] {123 * 2, 456L * 2, 789 * 2, 123 * 2}) .validate(ras); - } + new RowsAndColumnsHelper() + .expectColumn("dim", new String[] {"a", "a", "b", null}, ColumnType.STRING) + .validate(ras); + new RowsAndColumnsHelper() + .expectColumn("array", expectedArr, ColumnType.STRING_ARRAY) + .validate(ras); + } }