完整目录、平台简介、安装环境及版本:参考《Spark平台(高级版)概览》
十六、Spark ML Pipeline机器学习
Spark ML Pipeline 的引入,对目标数据集结构复杂,需要多次处理,或是在学习过程中,要使用多个转化器 (Transformer) 和预测器 (Estimator),这种情况下使用 MLlib 将会让程序结构极其复杂。所以,一个可用于构建复杂机器学习工作流应用的新库已经出现了,它就是 Spark 1.2 版本之后引入的 ML Pipeline。ML Pipeline 是建立在 DataFrames 上的更高层次的 API 库,可以使用户很方便的将对数据的不同处理阶段组合起来运行,从而使整个机器学习过程变得更加易用、简洁、规范和高效。
16.1 Pipeline组件
Spark ML Pipeline主要包含2个核心的数据处理组件:Transformer、Estimator,其中它们都是Pipeline中PipelineStage的子类,另外一些抽象,如Model、Predictor、Classifier、Regressor等都是基于这两个核心组件衍生出来,比如,Model是一个Transformer,Predictor是一个Estimator,它们的关系如下类图所示:
16.1.1 DataFrame数据集
Spark ML Pipeline使用DataFrame作为机器学习输入输出数据集的抽象。
DataFrame来自Spark SQL,表示对数据集的一种特殊抽象,它也是Dataset(它是Spark 1.6引入的表示分布式数据集的抽象接口),但是DataFrame通过为数据集中每行数据的每列指定列名的方式来组织Dataset,类似于关系数据库中的表,同时还在底层处理做了非常多的优化。DataFrame可以基于不同的数据源进行构建,比如结构化文件、Hive表、数据库、RDD等。或者更直白一点表达什么是DataFrame,可以认为它等价于Dataset[Row],表示DataFrame是一个Row类型数据对象的Dataset。
机器学习可以被应用于各种数据类型,例如向量、文本、图片、结构化数据。Spark ML API采用DataFrame的理由是,来自Spark SQL中的DataFrame接口的抽象,可以支持非常广泛的类型,而且表达非常直观,便于在Spark中进行处理。所以说,DataFrame是Spark ML最基础的对数据集的抽象,所有各种ML Pipeline组件都会基于DataFrame构建更加丰富、复杂的数据处理逻辑。
16.1.2 Transformer转换器
Transformer是一个算法,可以将一个 DataFrame 转换成另一个 DataFrame。主要抽象了两类操作:
16.1.2.1 特征变化
一个特征转换器Transformer输入一个DataFrame,读取其中一个或多个文本列,将其映射为新的特征向量列。输出一个新的带有特征向量列的DataFrame。
16.1.2.2 学习模型
一个学习模型转换器输入一个DataFrame,读取其中包括特征向量的列,预测每一个特征向量的标签。输出一个新的带有预测标签列的DataFrame。
Transformer类继承关系如图:
16.1.3 Estimator预测器
Estimator是一个算法。预测器通过 fit() 方法,接收一个 DataFrame 并产出一个模型。
Estimator用来训练模型,它的输入是一个DataFrame,输出是一个Model,Model是Spark ML中对机器学习模型的抽象和定义,Model其实是一个Transformer。一个机器学习算法是基于一个数据集进行训练的,Estimator对基于该训练集的机器学习算法进行了抽象。所以它的输入是一个数据集DataFrame,经过训练最终得到一个模型Model,也就是Transformer,一个Transformer又可以对输入的DataFrame执行变换操作。
Estimator类继承关系如图:
16.1.4 Pipeline
Pipeline 由一系列 stage 组成,形成一个机器学习工作流,每个 stage 为一个转换器 (Transformer) 或预测器 (Estimator)。这些 stage 的执行是按一定顺序的,输入的 DataFrame 在通过每个 stage 时被改变。在转换器阶段,transform() 方法作用在 DataFrame 上。预测器阶段,调用 fit() 方法来产生一个转换器(成为 PipelneModel 的一部分),然后该转换器的 transform() 方法作用在 DataFrame 上。
在机器学习过程中,通过一系列的算法来处理和学习数据是很普遍的,例如,一个简单的文档处理工作流可能包括以下几步:
- 将每个文档分成单个词;
- 将文档中的词转化成数字化的特征向量;
- 基于特征向量和标签学习得到预测模型;
16.1.5 Parameter
Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和预测器可共享用于指定参数的公共API。
ParamMap是一组(参数,值)对。
16.2 基本过程
构建机器学习一般需要经历的4个主要阶段:数据准备、训练模型、评估模型和应用模型。
16.2.1 数据准备ETL
数据准备阶段,通常会有一个或者多个已经存在的数据集,数据集的状态可能距离生产该数据的源头非常近,数据格式多种多样,不规范、缺失值、错误值随处可见。还有可能,数据集包含多个不同类型的子数据集,单独拿出每一个子数据集对机器学习模型训练,都没有什么意义,除非我们就需要对这些子数据集进行一些处理。
对于这样的输入数据集,如果不加处理而直接拿来进行机器学习模型的训练,一个结果是根本无法使用这样数据集作为生产机器学习模型的输入;另一个结果是可以满足训练机器学习模型算法的输入格式等要求,但是训练出来的机器学习模型根本无法投入生产,带来期望的效果。
面向机器学习的数据准备阶段,可以看成是一个通用的数据ETL(Extract-Transform-Load,用来描述将数据从来源端经过提取、转换、加载至目端的过程)过程,这个ETL过程除了进行基础的规范数据格式、去除噪声、集成数据等,还包含一些机器学习特有的数据ETL过程,比如:特征抽取、降维、主成分分析PCA(Principal Components Analysis旨在利用降维的思想,把多指标转化为少数几个综合指标。)等。
可见,数据准备阶段主要是对数据进行ETL,在此基础上可能需要选择合适的数据分割策略,生成满足机器学习模型训练的训练集,和用于评估模型的测试集。
16.2.2 训练模型
训练模型是构建机器学习应用各个阶段中最核心的阶段。该阶段,首先根据给定的问题域,选择一个适合解决该领域问题的模型,然后考虑基于所选择数据的规模和特点,使用特定的算法来计算生成需要的模型。
模型可以理解为一个数学函数,该函数最终能够满足的效果是,根据给定的输入数据,就能得到或近似得到认为合理的结果。一个数学函数具有一个或多个参数,训练模型的结果就是确定这些参数的值。函数可能很简单,也可能很复杂。数据集可能有其特点,比如数据规模超大、数据在处理过程中精度的损失等等,要在所选择的数据集上进行训练学习,通常不能得到目标函数所有参数理论上的精确值。最终的目标是,能够在给定的数据集上具有很好地表现,可以根据实际情况做特殊处理。在实际应用中,往往提升精度会耗费大量资源和时间,再对比模型带来效果可能微乎其微,所以舍弃一定的精度也能很好地在实际应用中使用该模型。
训练模型就是从给定的数据集学习得到数据中潜在的规律,通过以函数的形式表示,经过计算处理求得目标数学函数对应的全部参数。基于最终得到的参数所构造的函数,能够使函数很好地解决假设的问题(训练数据集),模拟给定训练数据集同时,又具备很好的泛化能力,即不会欠拟合或过拟合。
16.2.3 评估模型
训练模型得到了一组参数,能够模拟给定训练数据集,但是如果对于未来未知的数据,模型的表现会如何?为了解决这个疑问,我们需要将训练得到的模型,作用在给定的测试数据集上,根据结果进行分析,确定模型的精度是否能够满足应用需求。训练数据集和测试数据集唯一不同的就是是否已知标签,而对数据本身的处理逻辑基本都是相同的。
另外,评价模型的优劣,验证模型的好坏,需要选择适合特拟定领域的度量方法,从而对模型进行客观的评价。比如,离线模型的评估,常用准确率、精确率-召回率,而在线模型可能会使用CTR、A/B测试等。
16.2.4 应用模型
一个经过验证可以投入使用的模型,可能会提供一个特殊的结果数据集,根据应用的需要对其进行进一步处理,比如推荐模型中的物品集合很大,可以通过对推荐的物品结果集进行再加工处理,对支持的应用提供快速的查询服务。模型也可能是一个不可读的模型,这种情况可能需要基于该模型开发一个服务,直接对外提供模型服务。
具体的如何使用模型,这要依赖于实际应用的特点和使用方式。
16.3 案例
用ssh远程登录节点app-11,切换到hadoop用户,启动集群。
16.3.1 整体流程
- 利用组件StringIndexer针对alchemy_category进行编号alchemy_category_index,并将编号作为新的字段添加;
- 利用组件OneHotEncoder将alchemy_category_index的分类特征字段转换为多个字段的vector,比如有14中类型,该行数据为第三种,则转换为(14,[3],[1.0])即(0,0,0,1,0,0,0,0,0,0,0,0,0,0),并新增字段alchemy_category_indexVec;
- 利用VectorAssembler将多个特征字段整合成vector,此处特征值包括alchemy_category_indexVec和从alchemy_category_score开始直到最后,不包括最后的label字段。并新增字段features;
- 利用Classifier的fit,根据训练数据训练模型;
- 利用模型对测试数据进行预测,得出新的DataFrame。
16.3.2 数据上传
利用Spark MLlib二元决策树案例中用到的数据源作为测试数据进行网页的预测。
命令:hdfs dfs -ls /user/stumble/data
打开train.tsv文件,第一行为列名,用\t分割开:
test.tsv文件和train.tsv相比,没有label字段,这个文件适用于预测的,最后计算label值。
16.3.3 新建项目
Web页面输入:http://app-11:8888/登录
新建Python3项目
重命名为:MLPipeline
重命名后效果:
16.3.4 创建SparkSession
引入findspark,使用findspark初始化pyspark的依赖
import findspark
findspark.init()
引入:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
创建,基于集群:
sparkSession = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.master("spark://app-11:7077") \
.enableHiveSupport() \
.getOrCreate()
16.3.5 读取并分析文件
设置路径:
global Path
Path="hdfs://dmcluster/user/stumble/"
读取文件:CSV格式,将分隔符定位\t,保留头条字段名,用load也可以
trainDF = sparkSession.read.format("csv")\
.option("header","true")\
.option("delimiter","\t")\
.csv(Path+"data/train.tsv")
查看条数:trainDF.count()
DataFrame是带字段名和字段类型的,查看字段类型
trainDF.printSchema()
可以看出,都是默认用string类型。
16.3.6 整理数据
查看部分数据,发现书籍中存在很多”?”
trainDF.select("url","alchemy_category","alchemy_category_score","label").show()
自定义函数,将?转为0
UDF(User-defined functions, UDFs),即用户自定义函数,在Spark Sql的开发中十分常用,UDF对表中的每一行进行函数处理,返回新的值,有些类似与RDD编程中的Map()。
from pyspark.sql.functions import udf
def replace_question(x):
return ("0" if x=="?" else x)
replace_question=udf(replace_question)
读取字段及类型转换用:
from pyspark.sql.functions import col
import pyspark.sql.types
将从从4列开始的所有列中?转为0,同时将这些字段中的类型都转为double。并去除1,2两列数据。
trainDF=trainDF.select(['url','alchemy_category']+
replace_question(col(column)).cast("double").alias(column)
for column in trainDF.columns[4:]])
查看转换并去除1,2列数据后的数据
trainDF.printSchema()
可以看出相对源数据,第1,2列已经去除。并且从第4个字段开始的所有字段类型都由string改为了double。
查看具体数据,确认?已经改为了0
trainDF.select("url","alchemy_category","alchemy_category_score","label").show()
可以看出从源数据的第4列(即新数据的第2列)开始已经没有?号了,所有的?都被0取代了。
将数据按7:3比例拆分,并缓存
trainDFSplitTrain,testDFSplitTrain=trainDF.randomSplit([0.7,0.3])
trainDFSplitTrain.cache()
testDFSplitTrain.cache()
注:cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
查询拆分后的数量:
trainDFSplitTrain.count()
testDFSplitTrain.count()
16.3.7 StringIndexer
StringIndexer继承自Estimator,处理时需要先fit,然后再transform。
本部分目标针对alchemy_category进行编号alchemy_category_index,并经编号作为新的字段添加
引入StringIndexer:
from pyspark.ml.feature import StringIndexer
新建StringIndexer,针对类型字段alchemy_category,产生新字段alchemy_category_index:
categoryIndexer=StringIndexer(inputCol='alchemy_category', outputCol='alchemy_category_index')
先对源数据进行fit,产生Transform:
categoryTransform=categoryIndexer.fit(trainDF)
查看Transform内容:
categoryTransform.labels
可以看出,将alchemy_category字段特征值提取出来了
利用产生的Tranform,针对拆分后的DataFrame进行Transform运算。
trainDFSplitTrainIndexer=categoryTransform.transform(trainDFSplitTrain)
运算结果是增加了列alchemy_category_index
查看具体增加字段及对应值:
trainDFSplitTrainIndexer.select("url","alchemy_category","alchemy_category_index").show()
可以看出新增加的列,为对类型编号的索引值。
16.3.8 OneHotEncoder
接着将alchemy_category_index的分类特征字段转换为多个字段的vector,比如有14中类型,该行数据为第三种,则转换为(14,[3],[1.0])即(0,0,0,1,0,0,0,0,0,0,0,0,0,0),并新增字段alchemy_category_indexVec
引入OneHotEncoder:
from pyspark.ml.feature import OneHotEncoder
创建OneHotEncoder,针对alchemy_category_index,新增加转换后的字段为alchemy_category_indexVec’
onHotEncoder=OneHotEncoder(dropLast=False,
inputCol='alchemy_category_index',
outputCol='alchemy_category_indexVec')
Transform计算:针对StringIndexer后的DataFrame进行OneHotEncoder运算
trainDFSplitTrainEncoder=onHotEncoder.transform(trainDFSplitTrainIndexer)
查看是否增加了新字段alchemy_category_indexVec
查看新增加的字段内容:
trainDFSplitTrainEncoder.select("alchemy_category","alchemy_category_index",'alchemy_category_indexVec').show()
可以看到新增加的字段vector为(类型总数,类型编号,字段值[1.0]),
16.3.9 VectorAssembler
将多个特征字段整合成vector,此处特征值包括alchemy_category_indexVec和【2:-1】即从alchemy_category_score开始直到最后,不包括最后的label字段。并新增字段features
引入VectorAssembler:
from pyspark.ml.feature import VectorAssembler
设置需要整合的特征值
vectorAssemblerInputs=['alchemy_category_indexVec']+trainDF.columns[2:-1]
vectorAssemblerInputs
创建VectorAssembler,输入为整合的特征值,输出为计算后的特征值
vectorAssembler=VectorAssembler(inputCols=vectorAssemblerInputs, outputCol="features")
对OneHotEncoder后的DataFrame进行transform,得到新的DataFrame
trainDFSplitTrainAssembler=vectorAssembler.transform(trainDFSplitTrainEncoder)
查看列,新否新增加了features字段
查看特征值内容:
trainDFSplitTrainAssembler.select('alchemy_category_indexVec','features').take(1)
总共36字段,其中类型为14个字段,原始数据27个字段里面去除前面4个和后面1个,剩下22个。
格式为(编号:值),注意为0的没有显示。
目前为止,特征码里面包含了类型vector及源数据其他特征数据。
16.3.10 DecisionTreeClassifier
已经准备好features和label字段后,就可以执行二元分类了。
引入DecisionTreeClassifier:
from pyspark.ml.classification import DecisionTreeClassifier
构建二元树:
dtc=DecisionTreeClassifier(labelCol="label",featuresCol="features", impurity="gini",maxDepth=10,maxBins=12)
训练模型:
trainModel=dtc.fit(trainDFSplitTrainAssembler)
trainModel
根据模型进行预测:
trainDFSplitTrainModel=trainModel.transform(trainDFSplitTrainAssembler)
查看结果:
trainDFSplitTrainModel.select('url','rawPrediction','probability','prediction').show()
16.3.10.1 Pipeline建立及预测
之前利用Pipeline组件建模并预测,下一步根据组件进行流程化。
16.3.10.1.1 建立流程
引入Pipeline:
from pyspark.ml import Pipeline
建立Pipeline,重新构建四大组件:
stringIndexer=StringIndexer(inputCol='alchemy_category', outputCol='alchemy_category_index')
oneHotEncoder=OneHotEncoder(dropLast=False, inputCol='alchemy_category_index', outputCol='alchemy_category_indexVec')
vectorAssemblerInputs=['alchemy_category_indexVec']+trainDF.columns[2:-1]
vectorAssembler=VectorAssembler(inputCols=vectorAssemblerInputs, outputCol="features")
decisionTreeClassifier=DecisionTreeClassifier(labelCol="label",featuresCol="features", impurity="gini",maxDepth=10,maxBins=12)
pipeline=Pipeline(stages=stringIndexer,oneHotEncoder,vectorAssembler,decisionTreeClassifier])
查看阶段:pipeline.getStages()
16.3.10.1.2 训练模型
使用训练数据建立模型,相当于将之前几个组件阶段重新做一遍
pipelineModel=pipeline.fit(trainDFSplitTrain)
查看模型每个阶段:
pipelineModel.stages[0]
pipelineModel.stages[1]
pipelineModel.stages[2]
pipelineModel.stages[3]
16.3.10.1.3 预测
预测:
predictedDF=pipelineModel.transform(testDFSplitTrain)
查看预测结果列,多了’rawPrediction’,’probability’, ‘prediction’
predictedDF.columns
具体查看内容:
predicted.select('url','features','rawPrediction','probability','prediction').show()
其中:
- ‘rawPrediction’:用于评估模型准确率
- ‘probability’:显示0或者1的概率
- ‘prediction’:预测结果0或者1,
16.3.10.2 模型准确性评估
即通过AUC计算模型准确性
引入BinaryClassificationEvaluator:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
创建BinaryClassificationEvaluator,使用之前计算值,实际值,计算AUC
binaryClassificationEvaluator=BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction",
labelCol="label",
metricName="areaUnderROC")
训练模型:
predictedDF=pipelineModel.transform(testDFSplitTrain)
评估模型:
binaryClassificationEvaluator.evaluate(predictedDF)
查看对比情况
16.3.10.3 计算最佳模型
16.3.10.3.1 TrainValidationSplit
引入模块:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
创建ParamGridBuilder,设置测试参数值,此处相当于进行2*2*2=8次
paramGridBuilder=ParamGridBuilder()\
.addGrid(decisionTreeClassifier.impurity,["gini","entropy"])\
.addGrid(decisionTreeClassifier.maxDepth,[5,15])\
.addGrid(decisionTreeClassifier.maxBins,[10,20])\
.build()
创建TrainValidationSplit,按8:2完成训练和校验数据
trainValidationSplit=TrainValidationSplit(estimator=decisionTreeClassifier,
evaluator=binaryClassificationEvaluator,
estimatorParamMaps=paramGridBuilder,
trainRatio=0.8)
建立pipeline:
tvsPipeline=Pipeline(
stages=[stringIndexer,oneHotEncoder,vectorAssembler,trainValidationSplit])
利用训练数据trainDFSplitTrain训练最佳模型,内部会将该数据分成8:2进行训练和校验:
tvsPipelineModel=tvsPipeline.fit(trainDFSplitTrain)
利用最佳模型预测测试数据:
prediction=tvsPipelineModel.transform(testDFSplitTrain)
计算最佳模型下的AUC:
binaryClassificationEvaluator.evaluate(prediction)
16.3.10.3.2 crossValidation
操作流程类似前面
利用k-Fold交叉验证方式得到稳定模型,k越大效果越好,但是计算时间也会越长,此处用k=3,即将数据分成3块,三种组合情况下测试。相当于进行3*8=24次
引入CrossValidator:
from pyspark.ml.tuning import CrossValidator
建立CrossValidator,设置K=3
crossValidator=CrossValidator(estimator=decisionTreeClassifier,
evaluator=binaryClassificationEvaluator,
estimatorParamMaps=paramGridBuilder,
numFolds=3)
建立Pipeline
cvPipeline=Pipeline(
stages=[stringIndexer,oneHotEncoder,vectorAssembler,crossValidator])
利用训练数据trainDFSplitTrain训练最佳模型
cvPipelineModel=cvPipeline.fit(trainDFSplitTrain)
利用最佳模型预测测试数据
prediction= cvPipelineModel.transform(testDFSplitTrain)
计算最佳模型下的AUC
binaryClassificationEvaluator.evaluate(prediction)
16.3.11 RandomForestClassifier
16.3.11.1 Pipeline建立及预测
之前使用的都是DecisionTreeClassifier,更换为RandomForestClassifier,即多颗决策树,每颗决策树都是分类器。
引入RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier
新建RandomForestClassifier,设置决策树为10颗
randomForestClassifier=RandomForestClassifier(labelCol="label",
featuresCol="features",
numTrees=10)
建立pipeline
rfPipeline=Pipeline(stages=[stringIndexer,oneHotEncoder,vectorAssembler,randomForestClassifier])
利用训练数据trainDFSplitTrain训练最佳模型
rfPipelineModel=rfPipeline.fit(trainDFSplitTrain)
利用最佳模型预测测试数据
prediction=rfPipelineModel.transform(testDFSplitTrain)
计算最佳模型下的AUC
binaryClassificationEvaluator.evaluate(prediction)
16.3.11.2 找最佳模型
16.3.11.2.1 TrainValidationSplit
重新创建ParamGridBuilder,添加numTrees,由于运行速度问题,前面几个参数只设置一个值。
paramGridBuilder=ParamGridBuilder()\
.addGrid(randomForestClassifier.impurity,["gini"])\
.addGrid(randomForestClassifier.maxDepth,[5])\
.addGrid(randomForestClassifier.maxBins,[10])\
.addGrid(randomForestClassifier.numTrees, [10,30])\
.build()
新建TrainValidationSplit,替换estimator
trainValidationSplit=TrainValidationSplit(estimator=randomForestClassifier,
evaluator=binaryClassificationEvaluator,
estimatorParamMaps=paramGridBuilder,
trainRatio=0.8)
新建pipeline
rfPipeline=Pipeline(stages=[stringIndexer,oneHotEncoder,vectorAssembler,trainValidationSplit])
利用训练数据trainDFSplitTrain训练最佳模型
rfPipelineModel=rfPipeline.fit(trainDFSplitTrain)
利用最佳模型预测测试数据
prediction=rfPipelineModel.transform(testDFSplitTrain)
计算最佳模型下的AUC
binaryClassificationEvaluator.evaluate(prediction)
16.3.11.2.2 CrossValidator
新建CrossValidator
crossValidator=CrossValidator(estimator=randomForestClassifier,
evaluator=binaryClassificationEvaluator,
estimatorParamMaps=paramGridBuilder,
numFolds=3)
新建pipeline
rfPipeline=Pipeline(
stages=[stringIndexer,oneHotEncoder,vectorAssembler, crossValidator])
利用训练数据trainDFSplitTrain训练最佳模型
rfPipelineModel=rfPipeline.fit(trainDFSplitTrain)
利用最佳模型预测测试数据
prediction=rfPipelineModel.transform(testDFSplitTrain)
计算最佳模型下的AUC
binaryClassificationEvaluator.evaluate(prediction)
16.3.12 显示结果
建立字典
LabelDict={0:"temporary",
1:"ever"}
显示
for data in prediction.select("url","prediction").take(6):
print(data[0] + "==>" + LabelDict[data[1]] + "\n")