Scala Machine Learning Projects
上QQ阅读APP看书,第一时间看更新

Data preprocessing

Now that we have looked at some data properties, the next task is to do some preprocessing, such as cleaning, before getting the training set. For this part, use the Preprocessing.scala file. For this part, the following imports are required:

import org.apache.spark.ml.feature.{ StringIndexer, StringIndexerModel}
import org.apache.spark.ml.feature.VectorAssembler

Then we load both the training and the test set as shown in the following code:

var trainSample = 1.0 
var testSample = 1.0 
val train = "data/insurance_train.csv" 
val test = "data/insurance_test.csv" 
val spark = SparkSessionCreate.createSession()
import spark.implicits._ println("Reading data from " + train + " file")

val trainInput = spark.read .option("header", "true") .option("inferSchema", "true") .format("com.databricks.spark.csv") .load(train) .cache val testInput = spark.read .option("header", "true") .option("inferSchema", "true") .format("com.databricks.spark.csv") .load(test) .cache

The next task is to prepare the training and test set for our ML model to be learned. In the preceding DataFrame out of the training dataset, we renamed the loss to label. Then the content of train.csv was split into training and (cross) validation data, 75% and 25%, respectively.

The content of test.csv is used for evaluating the ML model. Both original DataFrames are also sampled, which is particularly useful for running fast executions on your local machine:

println("Preparing data for training model") 
var data = trainInput.withColumnRenamed("loss", "label").sample(false, trainSample) 

We also should do null checking. Here, I have used a naïve approach. The thing is that if the training DataFrame contains any null values, we completely drop those rows. This makes sense since a few rows out of 188,318 do no harm. However, feel free to adopt another approach such as null value imputation:

var DF = data.na.drop() 
if (data == DF) println("No null values in the DataFrame") else{ println("Null values exist in the DataFrame") data = DF }
val seed = 12345L val splits = data.randomSplit(Array(0.75, 0.25), seed) val (trainingData, validationData) = (splits(0), splits(1))

Then we cache both the sets for faster in-memory access:

trainingData.cache 
validationData.cache 

Additionally, we should perform the sampling of the test set that will be required in the evaluation step:

val testData = testInput.sample(false, testSample).cache 

Since the training set contains both the numerical and categorical values, we need to identify and treat them separately. First, let's identify only the categorical column:

def isCateg(c: String): Boolean = c.startsWith("cat") 
def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c 

Then, the following method is used to remove categorical columns with too many categories, which we already discussed in the preceding section:

def removeTooManyCategs(c: String): Boolean = !(c matches "cat(109$|110$|112$|113$|116$)")

Now the following method is used to select only feature columns. So essentially, we should remove the ID (since the ID is just the identification number of the clients, it does not carry any non-trivial information) and the label column:

def onlyFeatureCols(c: String): Boolean = !(c matches "id|label") 

Well, so far we have treated some bad columns that are either trivial or not needed at all. Now the next task is to construct the definitive set of feature columns:

val featureCols = trainingData.columns 
    .filter(removeTooManyCategs) 
    .filter(onlyFeatureCols) 
    .map(categNewCol) 
StringIndexer encodes a given string column of labels to a column of label indices. If the input column is numeric in nature, we cast it to string using the StringIndexer and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.

Now we need to use the StringIndexer() for categorical columns:

val stringIndexerStages = trainingData.columns.filter(isCateg) 
      .map(c => new StringIndexer() 
      .setInputCol(c) 
      .setOutputCol(categNewCol(c)) 
      .fit(trainInput.select(c).union(testInput.select(c)))) 

Note that this is not an efficient approach. An alternative approach would be using a OneHotEncoder estimator.

OneHotEncoder maps a column of label indices to a column of binary vectors, with a single one-value at most. This encoding permits algorithms that expect continuous features, such as logistic regression, to utilize categorical features.

Now let's use the VectorAssembler() to transform a given list of columns into a single vector column:

val assembler = new VectorAssembler() 
    .setInputCols(featureCols) 
    .setOutputCol("features")
VectorAssembler is a transformer. It combines a given list of columns into a single vector column. It is useful for combining the raw features and features generated by different feature transformers into one feature vector, in order to train ML models such as logistic regression and decision trees.

That's all we need before we start training the regression models. First, we start training the LR model and evaluate the performance.