Skip to content

Commit

Permalink
fix: adapter module
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jan 24, 2024
1 parent 3fe598a commit 21153cc
Show file tree
Hide file tree
Showing 25 changed files with 74 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -95,7 +96,6 @@
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.io.Decoder;

/**
Expand Down Expand Up @@ -159,7 +159,7 @@ private static Consumer createConsumer(

final BufferAllocator allocator = config.getAllocator();

final Type type = schema.getType();
final Schema.Type type = schema.getType();
final LogicalType logicalType = schema.getLogicalType();

final ArrowType arrowType;
Expand Down Expand Up @@ -215,7 +215,7 @@ private static Consumer createConsumer(
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector);
} else {
arrowType = new ArrowType.Int(32, /*signed=*/true);
arrowType = new ArrowType.Int(32, /*isSigned=*/true);
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroIntConsumer((IntVector) vector);
Expand Down Expand Up @@ -244,7 +244,7 @@ private static Consumer createConsumer(
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector);
} else {
arrowType = new ArrowType.Int(64, /*signed=*/true);
arrowType = new ArrowType.Int(64, /*isSigned=*/true);
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroLongConsumer((BigIntVector) vector);
Expand Down Expand Up @@ -278,7 +278,7 @@ private static Consumer createConsumer(
case NULL:
arrowType = new ArrowType.Null();
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
vector = fieldType.createNewSingleVector(name, allocator, /*schemaCallback=*/null);
vector = fieldType.createNewSingleVector(name, allocator, /*schemaCallBack=*/null);
consumer = new AvroNullConsumer((NullVector) vector);
break;
default:
Expand All @@ -305,7 +305,7 @@ private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType
private static Consumer createSkipConsumer(Schema schema) {

SkipFunction skipFunction;
Type type = schema.getType();
Schema.Type type = schema.getType();

switch (type) {
case UNION:
Expand Down Expand Up @@ -391,7 +391,7 @@ static CompositeAvroConsumer createCompositeConsumer(
final Set<String> skipFieldNames = config.getSkipFieldNames();

Schema.Type type = schema.getType();
if (type == Type.RECORD) {
if (type == Schema.Type.RECORD) {
for (Schema.Field field : schema.getFields()) {
if (skipFieldNames.contains(field.name())) {
consumers.add(createSkipConsumer(field.schema()));
Expand All @@ -416,7 +416,7 @@ private static FieldVector createVector(FieldVector consumerVector, FieldType fi

private static String getDefaultFieldName(ArrowType type) {
Types.MinorType minorType = Types.getMinorTypeForArrowType(type);
return minorType.name().toLowerCase();
return minorType.name().toLowerCase(Locale.ROOT);
}

private static Field avroSchemaToField(Schema schema, String name, AvroToArrowConfig config) {
Expand All @@ -429,7 +429,7 @@ private static Field avroSchemaToField(
AvroToArrowConfig config,
Map<String, String> externalProps) {

final Type type = schema.getType();
final Schema.Type type = schema.getType();
final LogicalType logicalType = schema.getLogicalType();
final List<Field> children = new ArrayList<>();
final FieldType fieldType;
Expand Down Expand Up @@ -457,7 +457,7 @@ private static Field avroSchemaToField(
FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null);
Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField));
children.add(structField);
fieldType = createFieldType(new ArrowType.Map(/*keySorted=*/false), schema, externalProps);
fieldType = createFieldType(new ArrowType.Map(/*keysSorted=*/false), schema, externalProps);
break;
case RECORD:
final Set<String> skipFieldNames = config.getSkipFieldNames();
Expand Down Expand Up @@ -509,7 +509,7 @@ private static Field avroSchemaToField(
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
intArrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
} else {
intArrowType = new ArrowType.Int(32, /*signed=*/true);
intArrowType = new ArrowType.Int(32, /*isSigned=*/true);
}
fieldType = createFieldType(intArrowType, schema, externalProps);
break;
Expand All @@ -525,7 +525,7 @@ private static Field avroSchemaToField(
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
} else {
longArrowType = new ArrowType.Int(64, /*signed=*/true);
longArrowType = new ArrowType.Int(64, /*isSigned=*/true);
}
fieldType = createFieldType(longArrowType, schema, externalProps);
break;
Expand Down Expand Up @@ -668,7 +668,7 @@ private static Consumer createUnionConsumer(Schema schema, String name, AvroToAr
FieldVector consumerVector) {
final int size = schema.getTypes().size();

final boolean nullable = schema.getTypes().stream().anyMatch(t -> t.getType() == Type.NULL);
final boolean nullable = schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL);

UnionVector unionVector;
if (consumerVector == null) {
Expand Down Expand Up @@ -709,7 +709,7 @@ static VectorSchemaRoot avroToArrowVectors(
final Set<String> skipFieldNames = config.getSkipFieldNames();

Schema.Type type = schema.getType();
if (type == Type.RECORD) {
if (type == Schema.Type.RECORD) {
for (Schema.Field field : schema.getFields()) {
if (skipFieldNames.contains(field.name())) {
consumers.add(createSkipConsumer(field.schema()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public boolean hasNext() {
/**
* Gets the next vector. The user is responsible for freeing its resources.
*/
@Override
public VectorSchemaRoot next() {
Preconditions.checkArgument(hasNext());
VectorSchemaRoot returned = nextBatch;
Expand All @@ -177,6 +178,7 @@ public VectorSchemaRoot next() {
/**
* Clean up resources.
*/
@Override
public void close() {
if (nextBatch != null) {
nextBatch.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public interface Consumer<T extends FieldVector> extends AutoCloseable {
/**
* Close this consumer when occurs exception to avoid potential leak.
*/
@Override
void close() throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -223,7 +224,7 @@ public void testSkipStringField() throws Exception {
ArrayList<GenericRecord> expectedData = new ArrayList<>();

for (int i = 0; i < 5; i++) {
final byte[] testBytes = ("test" + i).getBytes();
final byte[] testBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
GenericRecord record = new GenericData.Record(schema);
GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
fixed.bytes(testBytes);
Expand Down Expand Up @@ -257,7 +258,7 @@ public void testSkipBytesField() throws Exception {
ArrayList<GenericRecord> expectedData = new ArrayList<>();

for (int i = 0; i < 5; i++) {
final byte[] testBytes = ("test" + i).getBytes();
final byte[] testBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
GenericRecord record = new GenericData.Record(schema);
GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
fixed.bytes(testBytes);
Expand Down Expand Up @@ -291,7 +292,7 @@ public void testSkipFixedField() throws Exception {
ArrayList<GenericRecord> expectedData = new ArrayList<>();

for (int i = 0; i < 5; i++) {
final byte[] testBytes = ("test" + i).getBytes();
final byte[] testBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
GenericRecord record = new GenericData.Record(schema);
GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
fixed.bytes(testBytes);
Expand Down Expand Up @@ -325,7 +326,7 @@ public void testSkipEnumField() throws Exception {
ArrayList<GenericRecord> expectedData = new ArrayList<>();

for (int i = 0; i < 5; i++) {
final byte[] testBytes = ("test" + i).getBytes();
final byte[] testBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
GenericRecord record = new GenericData.Record(schema);
GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
fixed.bytes(testBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ protected void checkPrimitiveResult(List data, FieldVector vector) {
}
}

protected void checkRecordResult(Schema schema, ArrayList<GenericRecord> data, VectorSchemaRoot root) {
protected void checkRecordResult(Schema schema, List<GenericRecord> data, VectorSchemaRoot root) {
assertEquals(data.size(), root.getRowCount());
assertEquals(schema.getFields().size(), root.getFieldVectors().size());

Expand Down Expand Up @@ -194,7 +194,7 @@ protected void checkArrayResult(List<List<?>> expected, List<ListVector> vectors
}
}

protected void checkRecordResult(Schema schema, ArrayList<GenericRecord> data, List<VectorSchemaRoot> roots) {
protected void checkRecordResult(Schema schema, List<GenericRecord> data, List<VectorSchemaRoot> roots) {
roots.forEach(root -> {
assertEquals(schema.getFields().size(), root.getFieldVectors().size());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ public void runLargeNumberOfRows() throws Exception {
}
}

assertEquals(x, targetRows);
assertEquals(targetRows, x);
}

/**
* Fake avro decoder to test large data.
*/
private class FakeDecoder extends Decoder {
private static class FakeDecoder extends Decoder {

private int numRows;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ public void testFixedAttributes() throws Exception {
Schema schema = getSchema("attrs/test_fixed_attr.avsc");

List<GenericData.Fixed> data = new ArrayList<>();
List<byte[]> expected = new ArrayList<>();
for (int i = 0; i < 5; i++) {
byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8);
expected.add(value);
GenericData.Fixed fixed = new GenericData.Fixed(schema);
fixed.bytes(value);
data.add(fixed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void consume(InputStream is) throws IOException {
vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes, 0, read);
dataLength += read;
}
offsetBuffer.setInt((currentIndex + 1) * VarBinaryVector.OFFSET_WIDTH, startOffset + dataLength);
offsetBuffer.setInt((currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength);
BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
vector.setLastSet(currentIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void consume(ResultSet resultSet) throws SQLException {

ArrowBuf dataBuffer = vector.getDataBuffer();
ArrowBuf offsetBuffer = vector.getOffsetBuffer();
int startIndex = offsetBuffer.getInt(currentIndex * 4);
int startIndex = offsetBuffer.getInt(currentIndex * 4L);
while (read <= length) {
String str = clob.getSubString(read, readSize);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
Expand All @@ -106,7 +106,7 @@ public void consume(ResultSet resultSet) throws SQLException {
totalBytes += bytes.length;
read += readSize;
}
offsetBuffer.setInt((currentIndex + 1) * 4, startIndex + totalBytes);
offsetBuffer.setInt((currentIndex + 1) * 4L, startIndex + totalBytes);
BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
vector.setLastSet(currentIndex);
}
Expand Down Expand Up @@ -139,7 +139,7 @@ public void consume(ResultSet resultSet) throws SQLException {

ArrowBuf dataBuffer = vector.getDataBuffer();
ArrowBuf offsetBuffer = vector.getOffsetBuffer();
int startIndex = offsetBuffer.getInt(currentIndex * 4);
int startIndex = offsetBuffer.getInt(currentIndex * 4L);
while (read <= length) {
String str = clob.getSubString(read, readSize);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
Expand All @@ -157,7 +157,7 @@ public void consume(ResultSet resultSet) throws SQLException {
totalBytes += bytes.length;
read += readSize;
}
offsetBuffer.setInt((currentIndex + 1) * 4, startIndex + totalBytes);
offsetBuffer.setInt((currentIndex + 1) * 4L, startIndex + totalBytes);
BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
vector.setLastSet(currentIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface JdbcConsumer<T extends ValueVector> extends AutoCloseable {
/**
* Close this consumer, do some clean work such as clear reuse ArrowBuf.
*/
@Override
void close() throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ void bindOrder() throws SQLException {
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
final JdbcParameterBinder binder =
JdbcParameterBinder.builder(statement, root)
.bind(/*paramIndex=*/ 1, /*colIndex=*/ 2)
.bind(/*paramIndex=*/ 2, /*colIndex=*/ 0)
.bind(/*parameterIndex=*/ 1, /*columnIndex=*/ 2)
.bind(/*parameterIndex=*/ 2, /*columnIndex=*/ 0)
.build();
assertThat(binder.next()).isFalse();

Expand Down Expand Up @@ -169,7 +169,7 @@ void customBinder() throws SQLException {
final JdbcParameterBinder binder =
JdbcParameterBinder.builder(statement, root)
.bind(
/*paramIndex=*/ 1,
/*parameterIndex=*/ 1,
new ColumnBinder() {
private final IntVector vector = (IntVector) root.getVector(0);
@Override
Expand Down Expand Up @@ -275,11 +275,11 @@ void time32() throws SQLException {
@Test
void time64() throws SQLException {
testSimpleType(new ArrowType.Time(TimeUnit.MICROSECOND, 64), Types.TIME,
(valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() * 1_000)),
(valueVectors, index, value) -> valueVectors.setSafe(index, (value.getTime() * 1_000)),
TimeMicroVector::setNull,
Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
testSimpleType(new ArrowType.Time(TimeUnit.NANOSECOND, 64), Types.TIME,
(valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() * 1_000_000)),
(valueVectors, index, value) -> valueVectors.setSafe(index, (value.getTime() * 1_000_000)),
TimeNanoVector::setNull,
Arrays.asList(new Time(-128), new Time(104), new Time(-42)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
Expand All @@ -34,7 +31,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -228,9 +224,4 @@ private String getColumnComment(DatabaseMetaData metaData, String tableName, Str
}
return null;
}

private String getExpectedSchema(String expectedResource) throws java.io.IOException, java.net.URISyntaxException {
return new String(Files.readAllBytes(Paths.get(Objects.requireNonNull(
JdbcToArrowCommentMetadataTest.class.getResource(expectedResource)).toURI())), StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Calendar;
import java.util.HashMap;
import java.util.Locale;
import java.util.Objects;
import java.util.TimeZone;

import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -89,17 +90,17 @@ public void testConfig() {
JdbcToArrowConfigBuilder builder = new JdbcToArrowConfigBuilder(allocator, calendar);
JdbcToArrowConfig config = builder.build();

assertTrue(allocator == config.getAllocator());
assertTrue(calendar == config.getCalendar());
assertTrue(Objects.equals(allocator, config.getAllocator()));
assertTrue(Objects.equals(calendar, config.getCalendar()));

Calendar newCalendar = Calendar.getInstance();
BufferAllocator newAllocator = new RootAllocator(Integer.SIZE);

builder.setAllocator(newAllocator).setCalendar(newCalendar);
config = builder.build();

assertTrue(newAllocator == config.getAllocator());
assertTrue(newCalendar == config.getCalendar());
assertTrue(Objects.equals(newAllocator, config.getAllocator()));
assertTrue(Objects.equals(newCalendar, config.getCalendar()));
}

@Test
Expand Down
Loading

0 comments on commit 21153cc

Please sign in to comment.