Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kgyrtkirk committed Nov 10, 2023
1 parent cff97d2 commit 27a367d
Showing 1 changed file with 3 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.operator.window.WindowFrame.PeerType;
import org.apache.druid.query.operator.window.ranking.WindowDenseRankProcessor;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
Expand All @@ -41,9 +40,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -208,9 +205,9 @@ private int relativeRangeId(int rangeOffset)
*/
static class XRange {

public final int rowIdx;
public final int u;
public final int v;
private int rowIdx;

public XRange(int rowIdx, int u, int v)
{
Expand All @@ -220,92 +217,9 @@ public XRange(int rowIdx, int u, int v)
}
}


private RowsAndColumns computeRangeAggregates0(
AggregatorFactory[] aggFactories,
WindowFrame frame,
String changeColName)
{

new WindowDenseRankProcessor(
frame.getOrderByColNames(),
DefaultFramedOnHeapAggregatable.CHANGE_COL_NAME
).process(rac);

AtomicInteger rowIdProvider = new AtomicInteger(rac.numRows() - 1);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
ColumnValueSelector<?> changeColSelector = columnSelectorFactory.makeColumnValueSelector(changeColName);
long lastRangeId = changeColSelector.getLong();
rowIdProvider.set(rac.numRows() - 1);
// FIXME need (int) ?
int numRanges = (int) changeColSelector.getLong();

SlidingWindowPopulator swp = new SlidingWindowPopulator(
new AggDispatcher(aggFactories, columnSelectorFactory),
rac.numRows(),
numRanges,
frame
);

for (int i = 0; i < rac.numRows(); i++) {
rowIdProvider.set(i);
long currentRangeId = changeColSelector.getLong();
if(currentRangeId != lastRangeId) {
swp.boundary();
}
swp.aggregate();
}




throw new RuntimeException();
}

static class SlidingWindowPopulator {

private int width;
private int centerOffset;
private Deque<AggDispatcher.AggCell> frameStates;

public SlidingWindowPopulator(
AggDispatcher aggDispatcher,
int numRows,
int numRanges,
WindowFrame frame)
{
int lowerOffsetClamped = frame.getLowerOffsetClamped(numRanges);
int upperOffsetClamped = frame.getUpperOffsetClamped(numRanges);
width = Math.min(numRanges, lowerOffsetClamped + 1 + upperOffsetClamped);
centerOffset = upperOffsetClamped;

frameStates = new ArrayDeque<>();
for (int i = 0; i < lowerOffsetClamped + 1; i++) {
frameStates.addLast(aggDispatcher.newCell());
}
}

public void aggregate()
{
for (AggDispatcher.AggCell aggCell : frameStates) {
aggCell.aggregate();
}
}

public void boundary()
{
if(frameStates.size() >= centerOffset) {

if (centerOffset < width) {
centerOffset++;
}
}
}

}

// pretty close to AggregatorAdapters ; but for plain aggs
static class AggDispatcher {
static class AggDispatcher
{

private final AggregatorFactory[] aggFactories;
private ColumnSelectorFactory columnSelectorFactory;
Expand Down

0 comments on commit 27a367d

Please sign in to comment.