-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Honor ignoreInvalidRows in reducer of Hadoop indexer #1226
Conversation
Other data solutions have a setting for how many input rows can be bad before a job fails, and often have a way to store the bad rows in their own output so they can be cleaned up later. Is this expected to be bad because the row had a read error or because the data itself is corrupt? |
How to deal with the bad rows properly depends on the data set and individual requirements, and so it should be configurable. At minimum though, if they're set to be ignored, I think it's a good idea to log them somewhere (perhaps a file?) for debugging purposes. For my particular use case, there's nothing I can do with the bad rows other than ignore them, since the some of the fields in the log lines are externally-generated. My change in this PR is to simply honor the |
numRows = index.add(inputRow); | ||
} catch (ParseException e) { | ||
if (config.isIgnoreInvalidRows()) { | ||
log.info("Ignoring invalid row [%s] due to parsing error: %s", value.toString(), e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably should make this a warn, also put the error as the first argument like this:
log.info(e,"Ignoring invalid row [%s] due to parsing error", value.toString());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in the new commit.
@dkharrat : can you make the title of the PR something more like "Honor This helps signify this as a bug rather than a feature enhancement. |
@dkharrat this code is already there in HadoopDruidIndxerMapper which honors the "ignoreInvalidRows" configuration. Reducer will not even get the record if there was a problem with parsing it. |
@drcrallen done, I updated the title. @himadrisingh001 yes, the mapper also ignores malformed records, but it only parses the line based on the ParseSpec and doesn't parse the individual column values. In my case, I had a perfectly valid CSV line (so mapper succeeded in parsing it), but the column value was invalid (an integer was expected for a particular column, but got a string instead), so when the index tries to parse the column value, an exception is thrown. My change handles that case. |
@@ -382,8 +383,18 @@ protected void reduce( | |||
context.progress(); | |||
final InputRow inputRow = index.formatRow(parser.parse(value.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the try block to start up here? parser.parse can also throw ParseException
thanks @dkharrat that sounds reasonable. i agree with @drcrallen on including parser.parse inside try-catch block. |
f0a54ce
to
e908d9d
Compare
@dkharrat Thanks for the contrib! Do you mind signing our CLA: http://druid.io/community/cla.html |
👍 |
@@ -77,6 +80,7 @@ protected void map( | |||
} | |||
catch (Exception e) { | |||
if (config.isIgnoreInvalidRows()) { | |||
log.warn(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can potentially log a tremendous amount of errors. Can we cap the logging at something reasonable? Default = 10 or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mixed feelings about that limit.
In our architecture, the realtime is treated as "Probably right" and the batch fixup is treated as "definitely right"
As such, If the data itself is bad, It should probably be cleaned up externally unless the user is really OK with completely loosing data in Druid.
Thinking a bit more about it, I can see two major ways of looking at it:
ignoreInvalidRows
is an undocumented option for people who are intentionally trying to loose data. In which case either printing out no logs or printing out only a few lines would be OK.ignoreInvalidRows
is a "feature" whereby bad rows can go to an ephemeral something^(tm) where they can be fixed and recovered.
But officially supporting ignoreInvalidRows
feels like it breaks the Lambda assumption that batch fixup gets all the data. As such, I'm suggesting 1, which is how this PR is going towards anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either ways, It is good to have ignoreInvalidRows. In some of our pipelines we are getting data from other systems which might be bad and we don't care about losing that portion.
It will be too bad if this feature is not there and we have to have one full pass on the data just to filter the invalid and then ingest it to druid and that effectively increases our end-to-end ingestion time.
may be simple solution here is to just change the log.warn to log.debug so that it gets into logs only if user really wanted it in addition to the INVALID_ROW_COUNTER
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@himanshug I like that solution.
The reducer of the hadoop indexer now ignores lines with parsing exceptions (if enabled by the indexer config).
@himanshug i like your idea of using debug level to log invalid lines. I just updated the commit with this change. |
@fjy sure, I just submitted the CLA form. |
Honor ignoreInvalidRows in reducer of Hadoop indexer
In the reducer of the Hadoop indexer, exceptions are sometimes thrown for invalid column values. This change ignores those lines if 'ignoreInvalidRows' flag is enabled, similar to the mapper.
Without this change, the Hadoop Indexer job failed due to 1 line that was corrupt.