Spark平台(高级版十六)ML Pipeline集群学习

完整目录、平台简介、安装环境及版本:参考《Spark平台(高级版)概览》

十六、Spark ML Pipeline机器学习

Spark ML Pipeline 的引入,对目标数据集结构复杂,需要多次处理,或是在学习过程中,要使用多个转化器 (Transformer) 和预测器 (Estimator),这种情况下使用 MLlib 将会让程序结构极其复杂。所以,一个可用于构建复杂机器学习工作流应用的新库已经出现了,它就是 Spark 1.2 版本之后引入的 ML Pipeline。ML Pipeline 是建立在 DataFrames 上的更高层次的 API 库,可以使用户很方便的将对数据的不同处理阶段组合起来运行,从而使整个机器学习过程变得更加易用、简洁、规范和高效。 

16.1 Pipeline组件

Spark ML Pipeline主要包含2个核心的数据处理组件:Transformer、Estimator,其中它们都是Pipeline中PipelineStage的子类,另外一些抽象,如Model、Predictor、Classifier、Regressor等都是基于这两个核心组件衍生出来,比如,Model是一个Transformer,Predictor是一个Estimator,它们的关系如下类图所示: 

16.1.1 DataFrame数据集

Spark ML Pipeline使用DataFrame作为机器学习输入输出数据集的抽象。

DataFrame来自Spark SQL,表示对数据集的一种特殊抽象,它也是Dataset(它是Spark 1.6引入的表示分布式数据集的抽象接口),但是DataFrame通过为数据集中每行数据的每列指定列名的方式来组织Dataset,类似于关系数据库中的表,同时还在底层处理做了非常多的优化。DataFrame可以基于不同的数据源进行构建,比如结构化文件、Hive表、数据库、RDD等。或者更直白一点表达什么是DataFrame,可以认为它等价于Dataset[Row],表示DataFrame是一个Row类型数据对象的Dataset。

机器学习可以被应用于各种数据类型,例如向量、文本、图片、结构化数据。Spark ML API采用DataFrame的理由是,来自Spark SQL中的DataFrame接口的抽象,可以支持非常广泛的类型,而且表达非常直观,便于在Spark中进行处理。所以说,DataFrame是Spark ML最基础的对数据集的抽象,所有各种ML Pipeline组件都会基于DataFrame构建更加丰富、复杂的数据处理逻辑。

16.1.2 Transformer转换器

Transformer是一个算法,可以将一个 DataFrame 转换成另一个 DataFrame。主要抽象了两类操作:

16.1.2.1 特征变化

一个特征转换器Transformer输入一个DataFrame,读取其中一个或多个文本列,将其映射为新的特征向量列。输出一个新的带有特征向量列的DataFrame。

16.1.2.2 学习模型

一个学习模型转换器输入一个DataFrame,读取其中包括特征向量的列,预测每一个特征向量的标签。输出一个新的带有预测标签列的DataFrame。

Transformer类继承关系如图:

16.1.3 Estimator预测器

Estimator是一个算法。预测器通过 fit() 方法,接收一个 DataFrame 并产出一个模型。

Estimator用来训练模型,它的输入是一个DataFrame,输出是一个Model,Model是Spark ML中对机器学习模型的抽象和定义,Model其实是一个Transformer。一个机器学习算法是基于一个数据集进行训练的,Estimator对基于该训练集的机器学习算法进行了抽象。所以它的输入是一个数据集DataFrame,经过训练最终得到一个模型Model,也就是Transformer,一个Transformer又可以对输入的DataFrame执行变换操作。

Estimator类继承关系如图:

16.1.4 Pipeline

Pipeline 由一系列 stage 组成,形成一个机器学习工作流,每个 stage 为一个转换器 (Transformer) 或预测器 (Estimator)。这些 stage 的执行是按一定顺序的,输入的 DataFrame 在通过每个 stage 时被改变。在转换器阶段,transform() 方法作用在 DataFrame 上。预测器阶段,调用 fit() 方法来产生一个转换器(成为 PipelneModel 的一部分),然后该转换器的 transform() 方法作用在 DataFrame 上。

在机器学习过程中,通过一系列的算法来处理和学习数据是很普遍的,例如,一个简单的文档处理工作流可能包括以下几步:

  • 将每个文档分成单个词;
  • 将文档中的词转化成数字化的特征向量;
  • 基于特征向量和标签学习得到预测模型;

16.1.5 Parameter

Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和预测器可共享用于指定参数的公共API。

ParamMap是一组(参数,值)对。

16.2 基本过程

构建机器学习一般需要经历的4个主要阶段:数据准备、训练模型、评估模型和应用模型。

16.2.1 数据准备ETL

数据准备阶段,通常会有一个或者多个已经存在的数据集,数据集的状态可能距离生产该数据的源头非常近,数据格式多种多样,不规范、缺失值、错误值随处可见。还有可能,数据集包含多个不同类型的子数据集,单独拿出每一个子数据集对机器学习模型训练,都没有什么意义,除非我们就需要对这些子数据集进行一些处理。

对于这样的输入数据集,如果不加处理而直接拿来进行机器学习模型的训练,一个结果是根本无法使用这样数据集作为生产机器学习模型的输入;另一个结果是可以满足训练机器学习模型算法的输入格式等要求,但是训练出来的机器学习模型根本无法投入生产,带来期望的效果。

面向机器学习的数据准备阶段,可以看成是一个通用的数据ETL(Extract-Transform-Load,用来描述将数据从来源端经过提取、转换、加载至目端的过程)过程,这个ETL过程除了进行基础的规范数据格式、去除噪声、集成数据等,还包含一些机器学习特有的数据ETL过程,比如:特征抽取、降维、主成分分析PCA(Principal Components Analysis旨在利用降维的思想,把多指标转化为少数几个综合指标。)等。

可见,数据准备阶段主要是对数据进行ETL,在此基础上可能需要选择合适的数据分割策略,生成满足机器学习模型训练的训练集,和用于评估模型的测试集。

16.2.2 训练模型

训练模型是构建机器学习应用各个阶段中最核心的阶段。该阶段,首先根据给定的问题域,选择一个适合解决该领域问题的模型,然后考虑基于所选择数据的规模和特点,使用特定的算法来计算生成需要的模型。

模型可以理解为一个数学函数,该函数最终能够满足的效果是,根据给定的输入数据,就能得到或近似得到认为合理的结果。一个数学函数具有一个或多个参数,训练模型的结果就是确定这些参数的值。函数可能很简单,也可能很复杂。数据集可能有其特点,比如数据规模超大、数据在处理过程中精度的损失等等,要在所选择的数据集上进行训练学习,通常不能得到目标函数所有参数理论上的精确值。最终的目标是,能够在给定的数据集上具有很好地表现,可以根据实际情况做特殊处理。在实际应用中,往往提升精度会耗费大量资源和时间,再对比模型带来效果可能微乎其微,所以舍弃一定的精度也能很好地在实际应用中使用该模型。

训练模型就是从给定的数据集学习得到数据中潜在的规律,通过以函数的形式表示,经过计算处理求得目标数学函数对应的全部参数。基于最终得到的参数所构造的函数,能够使函数很好地解决假设的问题(训练数据集),模拟给定训练数据集同时,又具备很好的泛化能力,即不会欠拟合或过拟合。

16.2.3 评估模型

训练模型得到了一组参数,能够模拟给定训练数据集,但是如果对于未来未知的数据,模型的表现会如何?为了解决这个疑问,我们需要将训练得到的模型,作用在给定的测试数据集上,根据结果进行分析,确定模型的精度是否能够满足应用需求。训练数据集和测试数据集唯一不同的就是是否已知标签,而对数据本身的处理逻辑基本都是相同的。

另外,评价模型的优劣,验证模型的好坏,需要选择适合特拟定领域的度量方法,从而对模型进行客观的评价。比如,离线模型的评估,常用准确率、精确率-召回率,而在线模型可能会使用CTR、A/B测试等。

16.2.4 应用模型

一个经过验证可以投入使用的模型,可能会提供一个特殊的结果数据集,根据应用的需要对其进行进一步处理,比如推荐模型中的物品集合很大,可以通过对推荐的物品结果集进行再加工处理,对支持的应用提供快速的查询服务。模型也可能是一个不可读的模型,这种情况可能需要基于该模型开发一个服务,直接对外提供模型服务。

具体的如何使用模型,这要依赖于实际应用的特点和使用方式。

16.3 案例

用ssh远程登录节点app-11,切换到hadoop用户,启动集群。

16.3.1 整体流程

  • 利用组件StringIndexer针对alchemy_category进行编号alchemy_category_index,并将编号作为新的字段添加;
  • 利用组件OneHotEncoder将alchemy_category_index的分类特征字段转换为多个字段的vector,比如有14中类型,该行数据为第三种,则转换为(14,[3],[1.0])即(0,0,0,1,0,0,0,0,0,0,0,0,0,0),并新增字段alchemy_category_indexVec;
  • 利用VectorAssembler将多个特征字段整合成vector,此处特征值包括alchemy_category_indexVec和从alchemy_category_score开始直到最后,不包括最后的label字段。并新增字段features;
  • 利用Classifier的fit,根据训练数据训练模型;
  • 利用模型对测试数据进行预测,得出新的DataFrame。

16.3.2 数据上传

利用Spark MLlib二元决策树案例中用到的数据源作为测试数据进行网页的预测。

命令:hdfs dfs -ls /user/stumble/data

打开train.tsv文件,第一行为列名,用\t分割开:

test.tsv文件和train.tsv相比,没有label字段,这个文件适用于预测的,最后计算label值。

16.3.3 新建项目

Web页面输入:http://app-11:8888/登录

新建Python3项目

重命名为:MLPipeline

重命名后效果:

16.3.4 创建SparkSession

引入findspark,使用findspark初始化pyspark的依赖

import findspark
findspark.init()

引入:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

创建,基于集群:

sparkSession = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .master("spark://app-11:7077") \
    .enableHiveSupport() \
    .getOrCreate()

16.3.5 读取并分析文件

设置路径:

global Path
Path="hdfs://dmcluster/user/stumble/"

读取文件:CSV格式,将分隔符定位\t,保留头条字段名,用load也可以

trainDF = sparkSession.read.format("csv")\
                           .option("header","true")\
                           .option("delimiter","\t")\
                           .csv(Path+"data/train.tsv")

查看条数:trainDF.count()

DataFrame是带字段名和字段类型的,查看字段类型

trainDF.printSchema()

可以看出,都是默认用string类型。

16.3.6 整理数据

查看部分数据,发现书籍中存在很多”?”

trainDF.select("url","alchemy_category","alchemy_category_score","label").show()

自定义函数,将?转为0

UDF(User-defined functions, UDFs),即用户自定义函数,在Spark Sql的开发中十分常用,UDF对表中的每一行进行函数处理,返回新的值,有些类似与RDD编程中的Map()。

from pyspark.sql.functions import udf
def replace_question(x):
    return ("0" if x=="?" else x)
replace_question=udf(replace_question)

读取字段及类型转换用:

from pyspark.sql.functions import col
import pyspark.sql.types

将从从4列开始的所有列中?转为0,同时将这些字段中的类型都转为double。并去除1,2两列数据。

trainDF=trainDF.select(['url','alchemy_category']+
    replace_question(col(column)).cast("double").alias(column)
    for column in trainDF.columns[4:]])

查看转换并去除1,2列数据后的数据

trainDF.printSchema()

可以看出相对源数据,第1,2列已经去除。并且从第4个字段开始的所有字段类型都由string改为了double。

查看具体数据,确认?已经改为了0

trainDF.select("url","alchemy_category","alchemy_category_score","label").show()

可以看出从源数据的第4列(即新数据的第2列)开始已经没有?号了,所有的?都被0取代了。

将数据按7:3比例拆分,并缓存

trainDFSplitTrain,testDFSplitTrain=trainDF.randomSplit([0.7,0.3])
trainDFSplitTrain.cache()
testDFSplitTrain.cache()

注:cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;

查询拆分后的数量:

trainDFSplitTrain.count()
testDFSplitTrain.count()

16.3.7 StringIndexer

StringIndexer继承自Estimator,处理时需要先fit,然后再transform。

本部分目标针对alchemy_category进行编号alchemy_category_index,并经编号作为新的字段添加

引入StringIndexer:

from pyspark.ml.feature import StringIndexer

新建StringIndexer,针对类型字段alchemy_category,产生新字段alchemy_category_index:

categoryIndexer=StringIndexer(inputCol='alchemy_category', outputCol='alchemy_category_index')

先对源数据进行fit,产生Transform:

categoryTransform=categoryIndexer.fit(trainDF)

查看Transform内容:

categoryTransform.labels

可以看出,将alchemy_category字段特征值提取出来了

利用产生的Tranform,针对拆分后的DataFrame进行Transform运算。

trainDFSplitTrainIndexer=categoryTransform.transform(trainDFSplitTrain)

运算结果是增加了列alchemy_category_index

查看具体增加字段及对应值:

trainDFSplitTrainIndexer.select("url","alchemy_category","alchemy_category_index").show()

可以看出新增加的列,为对类型编号的索引值。

16.3.8 OneHotEncoder

接着将alchemy_category_index的分类特征字段转换为多个字段的vector,比如有14中类型,该行数据为第三种,则转换为(14,[3],[1.0])即(0,0,0,1,0,0,0,0,0,0,0,0,0,0),并新增字段alchemy_category_indexVec

引入OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

创建OneHotEncoder,针对alchemy_category_index,新增加转换后的字段为alchemy_category_indexVec’

onHotEncoder=OneHotEncoder(dropLast=False,
                          inputCol='alchemy_category_index',
                          outputCol='alchemy_category_indexVec')

Transform计算:针对StringIndexer后的DataFrame进行OneHotEncoder运算

trainDFSplitTrainEncoder=onHotEncoder.transform(trainDFSplitTrainIndexer)

查看是否增加了新字段alchemy_category_indexVec

查看新增加的字段内容:

trainDFSplitTrainEncoder.select("alchemy_category","alchemy_category_index",'alchemy_category_indexVec').show()

可以看到新增加的字段vector为(类型总数,类型编号,字段值[1.0]),

16.3.9 VectorAssembler

将多个特征字段整合成vector,此处特征值包括alchemy_category_indexVec和【2:-1】即从alchemy_category_score开始直到最后,不包括最后的label字段。并新增字段features

引入VectorAssembler:

from pyspark.ml.feature import VectorAssembler

设置需要整合的特征值

vectorAssemblerInputs=['alchemy_category_indexVec']+trainDF.columns[2:-1]
vectorAssemblerInputs

创建VectorAssembler,输入为整合的特征值,输出为计算后的特征值

vectorAssembler=VectorAssembler(inputCols=vectorAssemblerInputs, outputCol="features")

对OneHotEncoder后的DataFrame进行transform,得到新的DataFrame

trainDFSplitTrainAssembler=vectorAssembler.transform(trainDFSplitTrainEncoder)

查看列,新否新增加了features字段

查看特征值内容:

trainDFSplitTrainAssembler.select('alchemy_category_indexVec','features').take(1)

总共36字段,其中类型为14个字段,原始数据27个字段里面去除前面4个和后面1个,剩下22个。

格式为(编号:值),注意为0的没有显示。

目前为止,特征码里面包含了类型vector及源数据其他特征数据。

16.3.10 DecisionTreeClassifier

已经准备好features和label字段后,就可以执行二元分类了。

引入DecisionTreeClassifier:

from pyspark.ml.classification import DecisionTreeClassifier

构建二元树:

dtc=DecisionTreeClassifier(labelCol="label",featuresCol="features",                           impurity="gini",maxDepth=10,maxBins=12)

训练模型:

trainModel=dtc.fit(trainDFSplitTrainAssembler)
trainModel

根据模型进行预测:

trainDFSplitTrainModel=trainModel.transform(trainDFSplitTrainAssembler)

查看结果:

trainDFSplitTrainModel.select('url','rawPrediction','probability','prediction').show()
16.3.10.1 Pipeline建立及预测

之前利用Pipeline组件建模并预测,下一步根据组件进行流程化。

16.3.10.1.1 建立流程

引入Pipeline:

from pyspark.ml import Pipeline

建立Pipeline,重新构建四大组件:

stringIndexer=StringIndexer(inputCol='alchemy_category', outputCol='alchemy_category_index')
oneHotEncoder=OneHotEncoder(dropLast=False, inputCol='alchemy_category_index', outputCol='alchemy_category_indexVec')
vectorAssemblerInputs=['alchemy_category_indexVec']+trainDF.columns[2:-1]
vectorAssembler=VectorAssembler(inputCols=vectorAssemblerInputs, outputCol="features")
decisionTreeClassifier=DecisionTreeClassifier(labelCol="label",featuresCol="features", impurity="gini",maxDepth=10,maxBins=12)
pipeline=Pipeline(stages=stringIndexer,oneHotEncoder,vectorAssembler,decisionTreeClassifier])

查看阶段:pipeline.getStages()

16.3.10.1.2 训练模型

使用训练数据建立模型,相当于将之前几个组件阶段重新做一遍

pipelineModel=pipeline.fit(trainDFSplitTrain)

查看模型每个阶段:

pipelineModel.stages[0]
pipelineModel.stages[1]
pipelineModel.stages[2]
pipelineModel.stages[3]
16.3.10.1.3 预测

预测:

predictedDF=pipelineModel.transform(testDFSplitTrain)

查看预测结果列,多了’rawPrediction’,’probability’, ‘prediction’

predictedDF.columns

具体查看内容:

predicted.select('url','features','rawPrediction','probability','prediction').show()

其中:

  • ‘rawPrediction’:用于评估模型准确率
  • ‘probability’:显示0或者1的概率
  • ‘prediction’:预测结果0或者1,
16.3.10.2 模型准确性评估

即通过AUC计算模型准确性

引入BinaryClassificationEvaluator:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

创建BinaryClassificationEvaluator,使用之前计算值,实际值,计算AUC

binaryClassificationEvaluator=BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="label",
    metricName="areaUnderROC")

训练模型:

predictedDF=pipelineModel.transform(testDFSplitTrain)

评估模型:

binaryClassificationEvaluator.evaluate(predictedDF)

查看对比情况

16.3.10.3 计算最佳模型
16.3.10.3.1 TrainValidationSplit

引入模块:

from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit

创建ParamGridBuilder,设置测试参数值,此处相当于进行2*2*2=8次

paramGridBuilder=ParamGridBuilder()\
    .addGrid(decisionTreeClassifier.impurity,["gini","entropy"])\
    .addGrid(decisionTreeClassifier.maxDepth,[5,15])\
    .addGrid(decisionTreeClassifier.maxBins,[10,20])\
    .build()

创建TrainValidationSplit,按8:2完成训练和校验数据

trainValidationSplit=TrainValidationSplit(estimator=decisionTreeClassifier,
    evaluator=binaryClassificationEvaluator,
    estimatorParamMaps=paramGridBuilder,
    trainRatio=0.8)

建立pipeline:

tvsPipeline=Pipeline(
    stages=[stringIndexer,oneHotEncoder,vectorAssembler,trainValidationSplit])

利用训练数据trainDFSplitTrain训练最佳模型,内部会将该数据分成8:2进行训练和校验:

tvsPipelineModel=tvsPipeline.fit(trainDFSplitTrain)

利用最佳模型预测测试数据:

prediction=tvsPipelineModel.transform(testDFSplitTrain)

计算最佳模型下的AUC:

binaryClassificationEvaluator.evaluate(prediction)
16.3.10.3.2 crossValidation

操作流程类似前面

利用k-Fold交叉验证方式得到稳定模型,k越大效果越好,但是计算时间也会越长,此处用k=3,即将数据分成3块,三种组合情况下测试。相当于进行3*8=24次

引入CrossValidator:

from pyspark.ml.tuning import CrossValidator

建立CrossValidator,设置K=3

crossValidator=CrossValidator(estimator=decisionTreeClassifier,
                              evaluator=binaryClassificationEvaluator,
                              estimatorParamMaps=paramGridBuilder,
                              numFolds=3)

建立Pipeline

cvPipeline=Pipeline(
    stages=[stringIndexer,oneHotEncoder,vectorAssembler,crossValidator])

利用训练数据trainDFSplitTrain训练最佳模型

cvPipelineModel=cvPipeline.fit(trainDFSplitTrain)

利用最佳模型预测测试数据

prediction= cvPipelineModel.transform(testDFSplitTrain)

计算最佳模型下的AUC

binaryClassificationEvaluator.evaluate(prediction)

16.3.11 RandomForestClassifier

16.3.11.1 Pipeline建立及预测

之前使用的都是DecisionTreeClassifier,更换为RandomForestClassifier,即多颗决策树,每颗决策树都是分类器。

引入RandomForestClassifier

from pyspark.ml.classification import RandomForestClassifier

新建RandomForestClassifier,设置决策树为10颗

randomForestClassifier=RandomForestClassifier(labelCol="label",
                                             featuresCol="features",
                                             numTrees=10)

建立pipeline

rfPipeline=Pipeline(stages=[stringIndexer,oneHotEncoder,vectorAssembler,randomForestClassifier])

利用训练数据trainDFSplitTrain训练最佳模型

rfPipelineModel=rfPipeline.fit(trainDFSplitTrain)

利用最佳模型预测测试数据

prediction=rfPipelineModel.transform(testDFSplitTrain)

计算最佳模型下的AUC

binaryClassificationEvaluator.evaluate(prediction)
16.3.11.2 找最佳模型
16.3.11.2.1 TrainValidationSplit

重新创建ParamGridBuilder,添加numTrees,由于运行速度问题,前面几个参数只设置一个值。

paramGridBuilder=ParamGridBuilder()\
                .addGrid(randomForestClassifier.impurity,["gini"])\
                .addGrid(randomForestClassifier.maxDepth,[5])\
                .addGrid(randomForestClassifier.maxBins,[10])\
                .addGrid(randomForestClassifier.numTrees, [10,30])\
                .build()

新建TrainValidationSplit,替换estimator

trainValidationSplit=TrainValidationSplit(estimator=randomForestClassifier,
    evaluator=binaryClassificationEvaluator,
    estimatorParamMaps=paramGridBuilder,
    trainRatio=0.8)

新建pipeline

rfPipeline=Pipeline(stages=[stringIndexer,oneHotEncoder,vectorAssembler,trainValidationSplit])

利用训练数据trainDFSplitTrain训练最佳模型

rfPipelineModel=rfPipeline.fit(trainDFSplitTrain)

利用最佳模型预测测试数据

prediction=rfPipelineModel.transform(testDFSplitTrain)

计算最佳模型下的AUC

binaryClassificationEvaluator.evaluate(prediction)
16.3.11.2.2 CrossValidator

新建CrossValidator

crossValidator=CrossValidator(estimator=randomForestClassifier,
                              evaluator=binaryClassificationEvaluator,
                              estimatorParamMaps=paramGridBuilder,
                              numFolds=3)

新建pipeline

rfPipeline=Pipeline(
    stages=[stringIndexer,oneHotEncoder,vectorAssembler, crossValidator])

利用训练数据trainDFSplitTrain训练最佳模型

rfPipelineModel=rfPipeline.fit(trainDFSplitTrain)

利用最佳模型预测测试数据

prediction=rfPipelineModel.transform(testDFSplitTrain)

计算最佳模型下的AUC

binaryClassificationEvaluator.evaluate(prediction)

16.3.12 显示结果

建立字典

LabelDict={0:"temporary",
          1:"ever"}

显示

for data in prediction.select("url","prediction").take(6):
    print(data[0] + "==>" + LabelDict[data[1]] + "\n")

发表回复