From 12cea2df9e479077813b611c1b098ca39b1a3133 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Mon, 17 May 2021 18:06:26 -0400 Subject: [PATCH] feat: add matrix type parameter and improve auto logic (#1052) --- .../ml/spark/lightgbm/LightGBMBase.scala | 4 ++ .../spark/lightgbm/LightGBMClassifier.scala | 2 +- .../ml/spark/lightgbm/LightGBMParams.scala | 8 ++++ .../ml/spark/lightgbm/LightGBMRanker.scala | 2 +- .../ml/spark/lightgbm/LightGBMRegressor.scala | 2 +- .../ml/spark/lightgbm/TrainParams.scala | 10 +++-- .../ml/spark/lightgbm/TrainUtils.scala | 43 +++++++++++++++---- .../split1/VerifyLightGBMClassifier.scala | 7 ++- 8 files changed, 61 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala index 9aedbf9b01..8f55c954fa 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala @@ -184,6 +184,10 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine DartModeParams(getDropRate, getMaxDrop, getSkipDrop, getXGBoostDartMode, getUniformDrop) } + protected def getExecutionParams(): ExecutionParams = { + ExecutionParams(getChunkSize, getMatrixType) + } + /** * Inner train method for LightGBM learners. Calculates the number of workers, * creates a driver thread, and runs mapPartitions on the dataset. diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala index c04e697e19..c87daab7ff 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala @@ -50,7 +50,7 @@ class LightGBMClassifier(override val uid: String) getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage, getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, - getDelegate, getChunkSize, getDartParams()) + getDelegate, getDartParams(), getExecutionParams()) } def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = { diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMParams.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMParams.scala index bece8147d2..a37d8b9426 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMParams.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMParams.scala @@ -86,6 +86,14 @@ trait LightGBMExecutionParams extends Wrappable { def getChunkSize: Int = $(chunkSize) def setChunkSize(value: Int): this.type = set(chunkSize, value) + + val matrixType = new Param[String](this, "matrixType", + "Advanced parameter to specify whether the native lightgbm matrix constructed should be sparse or dense. " + + "Values can be auto, sparse or dense. Default value is auto, which samples first ten rows to determine type.") + setDefault(matrixType -> "auto") + + def getMatrixType: String = $(matrixType) + def setMatrixType(value: String): this.type = set(matrixType, value) } /** Defines common parameters across all LightGBM learners related to learning score evolution. diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala index ac2cbd6793..f020b2a357 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala @@ -56,7 +56,7 @@ class LightGBMRanker(override val uid: String) getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain, getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep, - getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize, getDartParams()) + getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(), getExecutionParams()) } def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = { diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala index ddc4ff55b1..03813add45 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala @@ -63,7 +63,7 @@ class LightGBMRegressor(override val uid: String) getEarlyStoppingRound, getImprovementTolerance, getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep, - getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize, getDartParams()) + getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(), getExecutionParams()) } def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = { diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainParams.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainParams.scala index 62c6875a37..dfd787d861 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainParams.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainParams.scala @@ -39,8 +39,8 @@ abstract class TrainParams extends Serializable { def minDataInLeaf: Int def featureNames: Array[String] def delegate: Option[LightGBMDelegate] - def chunkSize: Int def dartModeParams: DartModeParams + def executionParams: ExecutionParams override def toString: String = { // Since passing `isProvideTrainingMetric` to LightGBM as a config parameter won't work, @@ -75,7 +75,7 @@ case class ClassifierTrainParams(parallelism: String, topK: Int, numIterations: isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double, maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int, featureNames: Array[String], delegate: Option[LightGBMDelegate], - chunkSize: Int, dartModeParams: DartModeParams) + dartModeParams: DartModeParams, executionParams: ExecutionParams) extends TrainParams { override def toString(): String = { val extraStr = @@ -100,7 +100,7 @@ case class RegressorTrainParams(parallelism: String, topK: Int, numIterations: I isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double, maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int, featureNames: Array[String], delegate: Option[LightGBMDelegate], - chunkSize: Int, dartModeParams: DartModeParams) + dartModeParams: DartModeParams, executionParams: ExecutionParams) extends TrainParams { override def toString(): String = { s"alpha=$alpha tweedie_variance_power=$tweedieVariancePower boost_from_average=${boostFromAverage.toString} " + @@ -122,7 +122,7 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int, metric: String, evalAt: Array[Int], minGainToSplit: Double, maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int, featureNames: Array[String], delegate: Option[LightGBMDelegate], - chunkSize: Int, dartModeParams: DartModeParams) + dartModeParams: DartModeParams, executionParams: ExecutionParams) extends TrainParams { override def toString(): String = { val labelGainStr = @@ -142,3 +142,5 @@ case class DartModeParams(dropRate: Double, maxDrop: Int, skipDrop: Double, s"uniform_drop=$uniformDrop " } } + +case class ExecutionParams(chunkSize: Int, matrixType: String) extends Serializable diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala index 1cf98fbc2b..9540acee32 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala @@ -29,14 +29,23 @@ private object TrainUtils extends Serializable { def generateDataset(rowsIter: Iterator[Row], columnParams: ColumnParams, referenceDataset: Option[LightGBMDataset], schema: StructType, log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = { - val hrow = rowsIter.next() + val (concatRowsIter: Iterator[Row], isSparse: Boolean) = + if (trainParams.executionParams.matrixType == "auto") { + sampleRowsForArrayType(rowsIter, schema, columnParams) + } else if (trainParams.executionParams.matrixType == "sparse") { + (rowsIter: Iterator[Row], true) + } else if (trainParams.executionParams.matrixType == "dense") { + (rowsIter: Iterator[Row], false) + } else { + throw new Exception(s"Invalid parameter matrix type specified: ${trainParams.executionParams.matrixType}") + } var datasetPtr: Option[LightGBMDataset] = None - if (hrow.get(schema.fieldIndex(columnParams.featuresColumn)).isInstanceOf[DenseVector]) { - datasetPtr = aggregateDenseStreamedData(hrow, rowsIter, columnParams, referenceDataset, schema, log, trainParams) + if (!isSparse) { + datasetPtr = aggregateDenseStreamedData(concatRowsIter, columnParams, referenceDataset, schema, log, trainParams) // Validate generated dataset has the correct number of rows and cols datasetPtr.get.validateDataset() } else { - val rows = (Iterator[Row](hrow) ++ rowsIter).toArray + val rows = concatRowsIter.toArray val numRows = rows.length val labels = rows.map(row => row.getDouble(schema.fieldIndex(columnParams.labelColumn))) val rowsAsSparse = rows.map(row => row.get(schema.fieldIndex(columnParams.featuresColumn)) match { @@ -60,6 +69,24 @@ private object TrainUtils extends Serializable { datasetPtr } + /** + * Sample the first several rows to determine whether to construct sparse or dense matrix in lightgbm native code. + * @param rowsIter Iterator of rows. + * @param schema The schema. + * @param columnParams The column parameters. + * @return A reconstructed iterator with the same original rows and whether the matrix should be sparse or dense. + */ + def sampleRowsForArrayType(rowsIter: Iterator[Row], schema: StructType, + columnParams: ColumnParams): (Iterator[Row], Boolean) = { + val numSampledRows = 10 + val sampleRows = rowsIter.take(numSampledRows).toArray + val numDense = sampleRows.map(row => + row.get(schema.fieldIndex(columnParams.featuresColumn)).isInstanceOf[DenseVector]).filter(value => value).length + val numSparse = sampleRows.length - numDense + // recreate the iterator + (sampleRows.toIterator ++ rowsIter, numSparse > numDense) + } + def getRowAsDoubleArray(row: Row, columnParams: ColumnParams, schema: StructType): Array[Double] = { row.get(schema.fieldIndex(columnParams.featuresColumn)) match { case dense: DenseVector => dense.toArray @@ -108,11 +135,11 @@ private object TrainUtils extends Serializable { initScoreChunkedArrayOpt.foreach(_.release()) } - def aggregateDenseStreamedData(hrow: Row, rowsIter: Iterator[Row], columnParams: ColumnParams, + def aggregateDenseStreamedData(rowsIter: Iterator[Row], columnParams: ColumnParams, referenceDataset: Option[LightGBMDataset], schema: StructType, log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = { var numRows = 0 - val chunkSize = trainParams.chunkSize + val chunkSize = trainParams.executionParams.chunkSize val labelsChunkedArray = new floatChunkedArray(chunkSize) val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(chunkSize) } val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(chunkSize) } @@ -120,8 +147,8 @@ private object TrainUtils extends Serializable { val groupColumnValues: ListBuffer[Row] = new ListBuffer[Row]() try { var numCols = 0 - while (rowsIter.hasNext || numRows == 0) { - val row = if (numRows == 0) hrow else rowsIter.next() + while (rowsIter.hasNext) { + val row = rowsIter.next() numRows += 1 labelsChunkedArray.add(row.getDouble(schema.fieldIndex(columnParams.labelColumn)).toFloat) columnParams.weightColumn.map { col => diff --git a/src/test/scala/com/microsoft/ml/spark/lightgbm/split1/VerifyLightGBMClassifier.scala b/src/test/scala/com/microsoft/ml/spark/lightgbm/split1/VerifyLightGBMClassifier.scala index bc69f30256..3efbba9765 100644 --- a/src/test/scala/com/microsoft/ml/spark/lightgbm/split1/VerifyLightGBMClassifier.scala +++ b/src/test/scala/com/microsoft/ml/spark/lightgbm/split1/VerifyLightGBMClassifier.scala @@ -320,12 +320,15 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM test("Verify LightGBM Classifier with dart mode parameters") { // Assert the dart parameters work without failing and setting them to tuned values improves performance val Array(train, test) = pimaDF.randomSplit(Array(0.8, 0.2), seed) - val scoredDF1 = baseModel.setBoostingType("dart").fit(train).transform(test) + val scoredDF1 = baseModel.setBoostingType("dart"). + setMaxDrop(1) + .setSkipDrop(0.9) + .fit(train).transform(test) val scoredDF2 = baseModel.setBoostingType("dart") .setXGBoostDartMode(true) .setDropRate(0.6) .setMaxDrop(60) - .setSkipDrop(0.6) + .setSkipDrop(0.4) .setUniformDrop(true) .fit(train).transform(test) assertBinaryImprovement(scoredDF1, scoredDF2)