Skip to content

Commit

Permalink
feat: add matrix type parameter and improve auto logic (#1052)
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft authored May 17, 2021
1 parent 03b8b7d commit 12cea2d
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
DartModeParams(getDropRate, getMaxDrop, getSkipDrop, getXGBoostDartMode, getUniformDrop)
}

protected def getExecutionParams(): ExecutionParams = {
ExecutionParams(getChunkSize, getMatrixType)
}

/**
* Inner train method for LightGBM learners. Calculates the number of workers,
* creates a driver thread, and runs mapPartitions on the dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class LightGBMClassifier(override val uid: String)
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric,
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames,
getDelegate, getChunkSize, getDartParams())
getDelegate, getDartParams(), getExecutionParams())
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ trait LightGBMExecutionParams extends Wrappable {

def getChunkSize: Int = $(chunkSize)
def setChunkSize(value: Int): this.type = set(chunkSize, value)

val matrixType = new Param[String](this, "matrixType",
"Advanced parameter to specify whether the native lightgbm matrix constructed should be sparse or dense. " +
"Values can be auto, sparse or dense. Default value is auto, which samples first ten rows to determine type.")
setDefault(matrixType -> "auto")

def getMatrixType: String = $(matrixType)
def setMatrixType(value: String): this.type = set(matrixType, value)
}

/** Defines common parameters across all LightGBM learners related to learning score evolution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class LightGBMRanker(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain,
getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize, getDartParams())
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(), getExecutionParams())
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class LightGBMRegressor(override val uid: String)
getEarlyStoppingRound, getImprovementTolerance, getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf,
numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1,
getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize, getDartParams())
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(), getExecutionParams())
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = {
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/TrainParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ abstract class TrainParams extends Serializable {
def minDataInLeaf: Int
def featureNames: Array[String]
def delegate: Option[LightGBMDelegate]
def chunkSize: Int
def dartModeParams: DartModeParams
def executionParams: ExecutionParams

override def toString: String = {
// Since passing `isProvideTrainingMetric` to LightGBM as a config parameter won't work,
Expand Down Expand Up @@ -75,7 +75,7 @@ case class ClassifierTrainParams(parallelism: String, topK: Int, numIterations:
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int, dartModeParams: DartModeParams)
dartModeParams: DartModeParams, executionParams: ExecutionParams)
extends TrainParams {
override def toString(): String = {
val extraStr =
Expand All @@ -100,7 +100,7 @@ case class RegressorTrainParams(parallelism: String, topK: Int, numIterations: I
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int, dartModeParams: DartModeParams)
dartModeParams: DartModeParams, executionParams: ExecutionParams)
extends TrainParams {
override def toString(): String = {
s"alpha=$alpha tweedie_variance_power=$tweedieVariancePower boost_from_average=${boostFromAverage.toString} " +
Expand All @@ -122,7 +122,7 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int,
metric: String, evalAt: Array[Int], minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int, dartModeParams: DartModeParams)
dartModeParams: DartModeParams, executionParams: ExecutionParams)
extends TrainParams {
override def toString(): String = {
val labelGainStr =
Expand All @@ -142,3 +142,5 @@ case class DartModeParams(dropRate: Double, maxDrop: Int, skipDrop: Double,
s"uniform_drop=$uniformDrop "
}
}

case class ExecutionParams(chunkSize: Int, matrixType: String) extends Serializable
43 changes: 35 additions & 8 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,23 @@ private object TrainUtils extends Serializable {
def generateDataset(rowsIter: Iterator[Row], columnParams: ColumnParams,
referenceDataset: Option[LightGBMDataset], schema: StructType,
log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = {
val hrow = rowsIter.next()
val (concatRowsIter: Iterator[Row], isSparse: Boolean) =
if (trainParams.executionParams.matrixType == "auto") {
sampleRowsForArrayType(rowsIter, schema, columnParams)
} else if (trainParams.executionParams.matrixType == "sparse") {
(rowsIter: Iterator[Row], true)
} else if (trainParams.executionParams.matrixType == "dense") {
(rowsIter: Iterator[Row], false)
} else {
throw new Exception(s"Invalid parameter matrix type specified: ${trainParams.executionParams.matrixType}")
}
var datasetPtr: Option[LightGBMDataset] = None
if (hrow.get(schema.fieldIndex(columnParams.featuresColumn)).isInstanceOf[DenseVector]) {
datasetPtr = aggregateDenseStreamedData(hrow, rowsIter, columnParams, referenceDataset, schema, log, trainParams)
if (!isSparse) {
datasetPtr = aggregateDenseStreamedData(concatRowsIter, columnParams, referenceDataset, schema, log, trainParams)
// Validate generated dataset has the correct number of rows and cols
datasetPtr.get.validateDataset()
} else {
val rows = (Iterator[Row](hrow) ++ rowsIter).toArray
val rows = concatRowsIter.toArray
val numRows = rows.length
val labels = rows.map(row => row.getDouble(schema.fieldIndex(columnParams.labelColumn)))
val rowsAsSparse = rows.map(row => row.get(schema.fieldIndex(columnParams.featuresColumn)) match {
Expand All @@ -60,6 +69,24 @@ private object TrainUtils extends Serializable {
datasetPtr
}

/**
* Sample the first several rows to determine whether to construct sparse or dense matrix in lightgbm native code.
* @param rowsIter Iterator of rows.
* @param schema The schema.
* @param columnParams The column parameters.
* @return A reconstructed iterator with the same original rows and whether the matrix should be sparse or dense.
*/
def sampleRowsForArrayType(rowsIter: Iterator[Row], schema: StructType,
columnParams: ColumnParams): (Iterator[Row], Boolean) = {
val numSampledRows = 10
val sampleRows = rowsIter.take(numSampledRows).toArray
val numDense = sampleRows.map(row =>
row.get(schema.fieldIndex(columnParams.featuresColumn)).isInstanceOf[DenseVector]).filter(value => value).length
val numSparse = sampleRows.length - numDense
// recreate the iterator
(sampleRows.toIterator ++ rowsIter, numSparse > numDense)
}

def getRowAsDoubleArray(row: Row, columnParams: ColumnParams, schema: StructType): Array[Double] = {
row.get(schema.fieldIndex(columnParams.featuresColumn)) match {
case dense: DenseVector => dense.toArray
Expand Down Expand Up @@ -108,20 +135,20 @@ private object TrainUtils extends Serializable {
initScoreChunkedArrayOpt.foreach(_.release())
}

def aggregateDenseStreamedData(hrow: Row, rowsIter: Iterator[Row], columnParams: ColumnParams,
def aggregateDenseStreamedData(rowsIter: Iterator[Row], columnParams: ColumnParams,
referenceDataset: Option[LightGBMDataset], schema: StructType,
log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = {
var numRows = 0
val chunkSize = trainParams.chunkSize
val chunkSize = trainParams.executionParams.chunkSize
val labelsChunkedArray = new floatChunkedArray(chunkSize)
val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(chunkSize) }
val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(chunkSize) }
var featuresChunkedArrayOpt: Option[doubleChunkedArray] = None
val groupColumnValues: ListBuffer[Row] = new ListBuffer[Row]()
try {
var numCols = 0
while (rowsIter.hasNext || numRows == 0) {
val row = if (numRows == 0) hrow else rowsIter.next()
while (rowsIter.hasNext) {
val row = rowsIter.next()
numRows += 1
labelsChunkedArray.add(row.getDouble(schema.fieldIndex(columnParams.labelColumn)).toFloat)
columnParams.weightColumn.map { col =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,15 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM
test("Verify LightGBM Classifier with dart mode parameters") {
// Assert the dart parameters work without failing and setting them to tuned values improves performance
val Array(train, test) = pimaDF.randomSplit(Array(0.8, 0.2), seed)
val scoredDF1 = baseModel.setBoostingType("dart").fit(train).transform(test)
val scoredDF1 = baseModel.setBoostingType("dart").
setMaxDrop(1)
.setSkipDrop(0.9)
.fit(train).transform(test)
val scoredDF2 = baseModel.setBoostingType("dart")
.setXGBoostDartMode(true)
.setDropRate(0.6)
.setMaxDrop(60)
.setSkipDrop(0.6)
.setSkipDrop(0.4)
.setUniformDrop(true)
.fit(train).transform(test)
assertBinaryImprovement(scoredDF1, scoredDF2)
Expand Down

0 comments on commit 12cea2d

Please sign in to comment.