
LR for churn prediction
LR is one of the most widely used classifiers to predict a binary response. It is a linear ML method, as described in Chapter 1, Analyzing Insurance Severity Claim. The loss function is the formulation given by the logistic loss:

For the LR model, the loss function is the logistic loss. For a binary classification problem, the algorithm outputs a binary LR model such that, for a given new data point, denoted by x, the model makes predictions by applying the logistic function:

In the preceding equation, z = WTX and if f(WTX)>0.5, the outcome is positive; otherwise, it is negative.
Note that compared to linear regression, logistic regression provides you with a higher classification accuracy. Moreover, it is a flexible way to regularize a model for custom adjustment, and overall, the model responses are measures of probability.
Most importantly, whereas linear regression can predict only continuous values, linear regression can still be generalized enough to make it predict discrete values:
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
Now that we already know linear regression's working principle, let's start using the Spark-based implementation of linear regression. Let's start by importing the required packages and libraries.
Now, let's create a Spark session and import implicit:
val spark: SparkSession = SparkSessionCreate.createSession("ChurnPredictionLogisticRegression")
import spark.implicits._
We now need to define some hyperparameters to train an linear regression-based pipeline:
val numFolds = 10
val MaxIter: Seq[Int] = Seq(100)
val RegParam: Seq[Double] = Seq(1.0) // L2 regularization param, set 1.0 with L1 regularization
val Tol: Seq[Double] = Seq(1e-8)// for convergence tolerance for iterative algorithms
val ElasticNetParam: Seq[Double] = Seq(0.0001) //Combination of L1 & L2
The RegParam is a scalar that helps adjust the strength of the constraints: a small value implies a soft margin, so naturally, a large value implies a hard margin, and being an infinity is the hardest margin.
On the other hand, the Tol parameter is used for the convergence tolerance for iterative algorithms such as logistic regression or linear SVM. Now, once we have the hyperparameters defined and initialized, the next task is to instantiate an linear regression estimator, as follows:
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
Now that we have three transformers and an estimator ready, the next task is to chain in a single pipeline—that is, each of them acts as a stage:
val pipeline = new Pipeline()
.setStages(Array(PipelineConstruction.ipindexer,
PipelineConstruction.labelindexer,
PipelineConstruction.assembler, lr))
In order to perform such a grid search over the hyperparameter space, we need to define it first. Here, the functional programming properties of Scala are quite handy, because we just add function pointers and the respective parameters to be evaluated to the parameter grid, where you set up the parameters to test, and a cross-validation evaluator, to construct a model selection workflow. This searches through linear regression's max iteration, regularization param, tolerance, and Elastic Net for the best model:
val paramGrid = new ParamGridBuilder()
.addGrid(lr.maxIter, MaxIter)
.addGrid(lr.regParam, RegParam)
.addGrid(lr.tol, Tol)
.addGrid(lr.elasticNetParam, ElasticNetParam)
.build()
We then need to define a BinaryClassificationEvaluator evaluator, since this is a binary classification problem. Using this evaluator, the model will be evaluated according to a precision metric by comparing the test label column with the test prediction column. The default metrics are an area under the precision-recall curve and an area under the receiver operating characteristic (ROC) curve:
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("prediction")
We use a CrossValidator for best model selection. The CrossValidator uses the Estimator Pipeline, the Parameter Grid, and the Classification Evaluator. The CrossValidator uses the ParamGridBuilder to iterate through the max iteration, regression param, and tolerance and Elastic Net parameters of linear regression, and then evaluates the models, repeating 10 times per parameter value for reliable results—that is, 10-fold cross-validation:
val crossval = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
The preceding code is meant to perform cross-validation. The validator itself uses the BinaryClassificationEvaluator estimator for evaluating the training in the progressive grid space on each fold and makes sure that there's no overfitting.
Although there is so much stuff going on behind the scenes, the interface to our CrossValidator object stays slim and well-known, as CrossValidator also extends from Estimator and supports the fit method. This means that, after calling fit, the complete predefined pipeline, including all feature preprocessing and the LR classifier, is executed multiple times—each time with a different hyperparameter vector:
val cvModel = crossval.fit(Preprocessing.trainDF)
Now it's time to evaluate the predictive power of the LR model we created using the test dataset, which has not been used for any training or cross-validation so far—that is, unseen data to the model. As a first step, we need to transform the test set to the model pipeline, which will map the features according to the same mechanism we described in the preceding feature engineering step:
val predictions = cvModel.transform(Preprocessing.testSet)
al result = predictions.select("label", "prediction", "probability")
val resutDF = result.withColumnRenamed("prediction", "Predicted_label")
resutDF.show(10)
>>>

The prediction probabilities can also be very useful in ranking customers according to their likeliness to imperfection. This way, a limited number of resources can be utilized in a telecommunication business for withholding but can be focused to the most valuable customers.
However, seeing the previous prediction DataFrame, it is really difficult to guess the classification accuracy. In the second step, the evaluator evaluates itself using BinaryClassificationEvaluator, as follows:
val accuracy = evaluator.evaluate(predictions)
println("Classification accuracy: " + accuracy)
>>>
Classification accuracy: 0.7670592565329408
So, we get about 77% of classification accuracy from our binary classification model. Now using the accuracy for the binary classifier does not make enough sense.
Hence, researchers often recommend other performance metrics, such as area under the precision-recall curve and area under the ROC curve. However, for this we need to construct an RDD containing the raw scores on the test set:
val predictionAndLabels = predictions
.select("prediction", "label")
.rdd.map(x => (x(0).asInstanceOf[Double], x(1)
.asInstanceOf[Double]))
Now, the preceding RDD can be used to compute the two previously-mentioned performance metrics:
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("Area under the precision-recall curve: " + metrics.areaUnderPR)
println("Area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
>>>
Area under the precision-recall curve: 0.5761887477313975
Area under the receiver operating characteristic (ROC) curve: 0.7670592565329408
In this case, the evaluation returns 77% accuracy, but only 58% precision. In the following, we calculate some more metrics; for example, false and true positive and negative predictions are also useful to evaluate the model's performance:
- True positive: How often the model correctly predicted subscription canceling
- False positive: How often the model incorrectly predicted subscription canceling
- True negative: How often the model correctly predicted no canceling at all
- False negative: How often the model incorrectly predicted no canceling
val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val ratioCorrect = correct.toDouble / counttotal.toDouble
val truep = lp.filter($"prediction" === 0.0).filter($"label" ===
$"prediction").count() / counttotal.toDouble
val truen = lp.filter($"prediction" === 1.0).filter($"label" ===
$"prediction").count() / counttotal.toDouble
val falsep = lp.filter($"prediction" === 1.0).filter(not($"label" ===
$"prediction")).count() / counttotal.toDouble
val falsen = lp.filter($"prediction" === 0.0).filter(not($"label" ===
$"prediction")).count() / counttotal.toDouble
println("Total Count : " + counttotal)
println("Correct : " + correct)
println("Wrong: " + wrong)
println("Ratio wrong: " + ratioWrong)
println("Ratio correct: " + ratioCorrect)
println("Ratio true positive : " + truep)
println("Ratio false positive : " + falsep)
println("Ratio true negative : " + truen)
println("Ratio false negative : " + falsen)
>>>

Yet, we have not received good accuracy, so let's continue trying other classifiers, such as SMV. This time, we will use the linear SVM implementation from the Apache Spark ML package.