119-Pyspark Logistic Regression Pipeline

ML Pipeline
Pyspark LogisticRegression
Pyspark LogisticRegressionModel
Pyspark LogisticRegressionTrainingSummary

Machine Learning Pipelines: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame
Estimator: An Estimator is an algorithm which can be fit() on a DataFrame to produce a Transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession
import os

os.environ["PYTHONPATH"] = "/Users/user/opt/spark-3.1.2-bin-hadoop3.2/python"
os.environ["SPARK_HOME"] = "/Users/user/opt/spark-3.1.2-bin-hadoop3.2"

spark = SparkSession.builder.appName("QuickStart").getOrCreate()


def test_pipelines():
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

# 1. Transformer
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# 2. Transformer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

# 3. Estimator
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# fit the pipeline to generate a Transformer
model = pipeline.fit(training)

test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])

# "model" is a transformer after calling fit()
prediction = model.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")

for row in selected.collect():
rid, text, pro, predict = row
print(">>>>rowId={}, text={}, probability={}, predict={}".format(rid, text, pro, predict))


def test_step_by_step():
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
data = tokenizer.transform(training)
print(data.show(truncate=False))

hashingTF = HashingTF(inputCol="words", outputCol="features")
data = hashingTF.transform(data)
print(data.show(truncate=False))

lr_estimator = LogisticRegression(maxIter=10, regParam=0.01)
print(lr_estimator.getFeaturesCol())
print(lr_estimator.getLabelCol())

# generate a LogisticRegressionModel by calling fit()
lr_model = lr_estimator.fit(data)

print(lr_model.coefficients)

# BinaryLogisticRegressionTrainingSummary
summary = lr_model.summary

print(">>>threshold={}".format(lr_model.getThreshold))
print(">>>accuracy={}".format(lr_model.summary.accuracy))
print(">>>precisionByLabel={}".format(lr_model.summary.precisionByLabel))
print(">>>areaUnderROC={}".format(lr_model.summary.areaUnderROC))
print(">>>recallByLabel={}".format(summary.recallByLabel))


print("\n-----------------------------------\n")
test = spark.createDataFrame([
(4, "spark i j k", 1.0),
(5, "l m n", 0.0),
(6, "spark hadoop spark", 1.0),
(7, "apache hadoop", 0.0)], ["id", "text", "label"])

# get BinaryLogisticRegressionSummary/LogisticRegressionSummary by calling evaluate()
evaluate_summary = lr_model.evaluate(hashingTF.transform(tokenizer.transform(test)))
print("<<< accuracy={}".format(evaluate_summary.accuracy))
print("<<< precisionByLabel={}".format(evaluate_summary.precisionByLabel))
print("<<< areaUnderROC={}".format(evaluate_summary.areaUnderROC))
print("<<< recallByLabel={}".format(evaluate_summary.recallByLabel))

selected = evaluate_summary.predictions.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, pro, predict = row
print("<<< rowId={}, text={}, probability={}, predict={}".format(rid, text, pro, predict))

test_pipelines()

test_step_by_step()