Skip to content

Commit

Permalink
Add topic name as a column in the Kafka Input format (#14857)
Browse files Browse the repository at this point in the history
This PR adds a way to store the topic name in a column. Such a column can be used to distinguish messages coming from different topics in multi-topic ingestion.
  • Loading branch information
abhishekagarwal87 authored Aug 21, 2023
1 parent 9290605 commit a38b4f0
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class KafkaInputFormat implements InputFormat
{
private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header.";
private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp";
private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic";
private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key";
public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp";

Expand All @@ -54,14 +55,16 @@ public class KafkaInputFormat implements InputFormat
private final String headerColumnPrefix;
private final String keyColumnName;
private final String timestampColumnName;
private final String topicColumnName;

public KafkaInputFormat(
@JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat,
@JsonProperty("keyFormat") @Nullable InputFormat keyFormat,
@JsonProperty("valueFormat") InputFormat valueFormat,
@JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix,
@JsonProperty("keyColumnName") @Nullable String keyColumnName,
@JsonProperty("timestampColumnName") @Nullable String timestampColumnName
@JsonProperty("timestampColumnName") @Nullable String timestampColumnName,
@JsonProperty("topicColumnName") @Nullable String topicColumnName
)
{
this.headerFormat = headerFormat;
Expand All @@ -70,6 +73,7 @@ public KafkaInputFormat(
this.headerColumnPrefix = headerColumnPrefix != null ? headerColumnPrefix : DEFAULT_HEADER_COLUMN_PREFIX;
this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME;
this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME;
}

@Override
Expand Down Expand Up @@ -116,7 +120,8 @@ record ->
temporaryDirectory
),
keyColumnName,
timestampColumnName
timestampColumnName,
topicColumnName
);
}

Expand Down Expand Up @@ -161,6 +166,13 @@ public String getTimestampColumnName()
return timestampColumnName;
}

@Nullable
@JsonProperty
public String getTopicColumnName()
{
return topicColumnName;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -176,14 +188,15 @@ public boolean equals(Object o)
&& Objects.equals(keyFormat, that.keyFormat)
&& Objects.equals(headerColumnPrefix, that.headerColumnPrefix)
&& Objects.equals(keyColumnName, that.keyColumnName)
&& Objects.equals(timestampColumnName, that.timestampColumnName);
&& Objects.equals(timestampColumnName, that.timestampColumnName)
&& Objects.equals(topicColumnName, that.topicColumnName);
}

@Override
public int hashCode()
{
return Objects.hash(headerFormat, valueFormat, keyFormat,
headerColumnPrefix, keyColumnName, timestampColumnName
headerColumnPrefix, keyColumnName, timestampColumnName, topicColumnName
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader
private final InputEntityReader valueParser;
private final String keyColumnName;
private final String timestampColumnName;
private final String topicColumnName;

/**
*
Expand All @@ -74,7 +75,8 @@ public KafkaInputReader(
@Nullable Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier,
InputEntityReader valueParser,
String keyColumnName,
String timestampColumnName
String timestampColumnName,
String topicColumnName
)
{
this.inputRowSchema = inputRowSchema;
Expand All @@ -84,6 +86,7 @@ public KafkaInputReader(
this.valueParser = valueParser;
this.keyColumnName = keyColumnName;
this.timestampColumnName = timestampColumnName;
this.topicColumnName = topicColumnName;
}

@Override
Expand Down Expand Up @@ -128,6 +131,9 @@ private Map<String, Object> extractHeader(KafkaRecordEntity record)
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());

// Add kafka record topic to the mergelist, only if the key doesn't already exist
mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic());

return mergedHeaderMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class KafkaInputFormatTest
{
private KafkaRecordEntity inputEntity;
private final long timestamp = DateTimes.of("2021-06-24").getMillis();
private static final String TOPIC = "sample";
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(
new Header()
{
Expand Down Expand Up @@ -126,7 +127,8 @@ public void setUp()
),
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
);
}

Expand Down Expand Up @@ -166,7 +168,8 @@ public void testSerde() throws JsonProcessingException
),
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
);
Assert.assertEquals(format, kif);

Expand Down Expand Up @@ -209,7 +212,8 @@ public void testWithHeaderKeyAndValue() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand All @@ -231,7 +235,8 @@ public void testWithHeaderKeyAndValue() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
),
row.getDimensions()
);
Expand All @@ -254,6 +259,10 @@ public void testWithHeaderKeyAndValue() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
Expand Down Expand Up @@ -302,7 +311,8 @@ public void testWithOutKey() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand Down Expand Up @@ -478,7 +488,7 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException
null, null, false, //make sure JsonReader is used
false, false
),
"kafka.newheader.", "kafka.newkey.", "kafka.newts."
"kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic."
);

final InputEntityReader reader = localFormat.createReader(
Expand All @@ -489,7 +499,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException
ImmutableList.of(
"bar",
"foo",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand Down Expand Up @@ -567,7 +578,8 @@ public void testWithMultipleMixedRecords() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand Down Expand Up @@ -613,6 +625,10 @@ public void testWithMultipleMixedRecords() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH")));


Expand Down Expand Up @@ -669,7 +685,8 @@ public void testMissingTimestampThrowsException() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand All @@ -683,7 +700,8 @@ public void testMissingTimestampThrowsException() throws IOException
while (iterator.hasNext()) {
Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next());
Assert.assertEquals(
"Timestamp[null] is unparseable! Event: {foo=x, kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, bar=null, kafka...",
"Timestamp[null] is unparseable! Event: {kafka.newtopic.topic=sample, foo=x, kafka.newts"
+ ".timestamp=1624492800000, kafka.newkey.key=sampleKey...",
t.getMessage()
);
}
Expand Down Expand Up @@ -733,6 +751,7 @@ public void testWithSchemaDiscovery() throws IOException
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"kafka.newtopic.topic",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
Expand Down Expand Up @@ -767,6 +786,10 @@ public void testWithSchemaDiscovery() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
Expand Down Expand Up @@ -834,6 +857,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException
Arrays.asList(
"bar",
"kafka.newheader.kafkapkc",
"kafka.newtopic.topic",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
Expand Down Expand Up @@ -866,6 +890,10 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
Expand All @@ -889,7 +917,7 @@ private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers he
{
return new KafkaRecordEntity(
new ConsumerRecord<>(
"sample",
TOPIC,
0,
0,
timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public byte[] value()
new KafkaStringHeaderFormat(null),
INPUT_FORMAT,
INPUT_FORMAT,
"kafka.testheader.", "kafka.key", "kafka.timestamp"
"kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic"
);

private static TestingCluster zkServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void testSampleKafkaInputFormat()
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
null
),

Expand Down

0 comments on commit a38b4f0

Please sign in to comment.