Skip to content

Commit

Permalink
Remove EOFException catch block from the Avro decoders (#15018)
Browse files Browse the repository at this point in the history
* Remove stale comment since we're on avro version 1.11.1

* Update exception blocks. With 1.11.1, read() only throws IOException.

* Unit tests

* Cleanup and add more tests.
  • Loading branch information
abhishekrb19 authored Sep 25, 2023
1 parent f7a5491 commit ba6101a
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -87,17 +86,8 @@ public GenericRecord parse(ByteBuffer bytes)
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
// waiting for avro v1.9.0 (#AVRO-813)
throw new ParseException(
null,
eof,
"Avro's unnecessary EOFException, detail: [%s]",
"https://issues.apache.org/jira/browse/AVRO-813"
);
}
catch (Exception e) {
throw new ParseException(null, e, "Fail to decode avro message!");
throw new ParseException(null, e, "Failed to read Avro message");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -101,35 +100,26 @@ public Map<String, Map<String, Object>> getSchemas()
public GenericRecord parse(ByteBuffer bytes)
{
if (bytes.remaining() < 5) {
throw new ParseException(null, "record must have at least 5 bytes carrying version and schemaId");
throw new ParseException(null, "Record must have at least 5 bytes carrying version and schemaId");
}

byte version = bytes.get();
if (version != V1) {
throw new ParseException(null, "found record of arbitrary version [%s]", version);
throw new ParseException(null, "Found record of arbitrary version[%s]", version);
}

int schemaId = bytes.getInt();
Schema schemaObj = schemaObjs.get(schemaId);
if (schemaObj == null) {
throw new ParseException(null, "Failed to find schema for id [%s]", schemaId);
throw new ParseException(null, "Failed to find schema for id[%s]", schemaId);
}

DatumReader<GenericRecord> reader = new GenericDatumReader<>(schemaObj);
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
// waiting for avro v1.9.0 (#AVRO-813)
throw new ParseException(
null,
eof,
"Avro's unnecessary EOFException, detail: [%s]",
"https://issues.apache.org/jira/browse/AVRO-813"
);
}
catch (Exception e) {
throw new ParseException(null, e, "Fail to decode avro message with schemaId [%s].", schemaId);
throw new ParseException(null, e, "Failed to read Avro message with schema id[%s]", schemaId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.schemarepo.api.TypedSchemaRepository;
import org.schemarepo.api.converter.AvroSchemaConverter;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -83,17 +81,8 @@ public GenericRecord parse(ByteBuffer bytes)
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
// waiting for avro v1.9.0 (#AVRO-813)
throw new ParseException(
null,
eof,
"Avro's unnecessary EOFException, detail: [%s]",
"https://issues.apache.org/jira/browse/AVRO-813"
);
}
catch (IOException e) {
throw new ParseException(null, e, "Fail to decode avro message!");
catch (Exception e) {
throw new ParseException(null, e, "Failed to read Avro message");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
Expand All @@ -43,6 +44,7 @@
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -299,6 +301,45 @@ public void testParseSchemaless() throws SchemaValidationException, IOException
}
}

@Test
public void testParseInvalidData() throws IOException, SchemaValidationException
{
Repository repository = new InMemoryRepository(null);
SchemaRepoBasedAvroBytesDecoder<String, Integer> decoder = new SchemaRepoBasedAvroBytesDecoder<>(
new Avro1124SubjectAndIdConverter(TOPIC),
repository
);

// prepare data
GenericRecord someAvroDatum = buildSomeAvroDatum();

// encode schema id
Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
repository,
new IntegerConverter(),
new AvroSchemaConverter(),
new IdentityConverter()
);
Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
converter.putSubjectAndId(id, byteBuffer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[0]);
out.write(byteBuffer.array());

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
// write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));

ParseException parseException = Assert.assertThrows(
ParseException.class,
() -> decoder.parse(ByteBuffer.wrap(out.toByteArray()))
);
Assert.assertTrue(parseException.getCause() instanceof AvroRuntimeException);
Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message"));
}

static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions, boolean isFromPigAvro)
{
Assert.assertEquals(expectedDimensions, inputRow.getDimensions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
Expand All @@ -29,10 +30,12 @@
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
Expand Down Expand Up @@ -87,4 +90,50 @@ public void testParse() throws Exception
GenericRecord actual = new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()));
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
}

@Test
public void testParseInvalidEncodedData() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();

// Encode data incorrectly
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.putInt(-1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[0]);
out.write(byteBuffer.array());

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));

ParseException parseException = Assert.assertThrows(
ParseException.class,
() -> new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()))
);

Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message"));
Assert.assertTrue(parseException.getCause() instanceof IOException);
}

@Test
public void testParseSmallInvalidChunk() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();

// Write a small chunk of data to trigger an AvroRuntimeException
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(ByteBuffer.allocate(20).array());

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));

ParseException parseException = Assert.assertThrows(
ParseException.class,
() -> new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()))
);
Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message"));
Assert.assertTrue(parseException.getCause() instanceof AvroRuntimeException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -106,4 +107,77 @@ public void testParse() throws Exception
).parse(ByteBuffer.wrap(out.toByteArray()));
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
}

@Test
public void testParseInvalidVersion() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();

ByteArrayOutputStream out = new ByteArrayOutputStream();

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));

ParseException parseException = Assert.assertThrows(
ParseException.class,
() -> new InlineSchemasAvroBytesDecoder(
ImmutableMap.of(
10,
schema
)
).parse(ByteBuffer.wrap(out.toByteArray()))
);
Assert.assertTrue(parseException.getMessage().contains("Found record of arbitrary version"));
}

@Test
public void testParseInvalidSchemaId() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();

ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[]{1});

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));

ParseException parseException = Assert.assertThrows(
ParseException.class,
() -> new InlineSchemasAvroBytesDecoder(
ImmutableMap.of(
10,
schema
)
).parse(ByteBuffer.wrap(out.toByteArray()))
);
Assert.assertTrue(parseException.getMessage().contains("Failed to find schema for id"));
}

@Test
public void testParseInvalidData() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();

ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[]{1});
out.write(ByteBuffer.allocate(4).putInt(10).array());
out.write(ByteBuffer.allocate(24).putInt(777).array()); // add some junk

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));

ParseException parseException = Assert.assertThrows(
ParseException.class,
() -> new InlineSchemasAvroBytesDecoder(
ImmutableMap.of(
10,
schema
)
).parse(ByteBuffer.wrap(out.toByteArray()))
);
Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message with schema id[10]"));
}
}

0 comments on commit ba6101a

Please sign in to comment.