Spark平台(高级版九)Spark案例

完整目录、平台简介、安装环境及版本:参考《Spark平台(高级版)概览》

九、Spark快速大数据处理综合案例实战

9.1 SparkSQL/DataFrame API

9.1.1 API概述

Python版本的API:http://spark.apache.org/docs/latest/api/python/index.html

包括基本功能、流处理、机器学习、sql等模块接口,本课程重心在SQL模块。

本节不深入到内部讲一个个接口函数,接口函数太多,通过例子讲解API,讲解API的使用基本规范,熟悉后,需要用到什么接口,可以查相关文档。

9.1.1.1 数据帧

如图,DataFrame可以认为是Excel里面的一个表格,这个表格里面有表头,对应该列的含义,还有很多行,相当于数据库里面的行,不同列可以是不同的类型,大小可变。

9.2 淘宝用户行为分析案例

以淘宝用户行为分析作为案例,讲解SQL以及DataFrame的API,同时也通过这种API给做大数据分析,帮助大家理解怎么分析数据,特别是电商的数据,而且是集中在用户的行为分析集上。

9.2.1 数据分析任务

通过用户的注册、登录、浏览、加购物车、购买这一序列的行为数据分析网站的PV、UV以及用户的各种转化率,如预览之后是不是产生了购买行为,添加购物车之后是否产生了购买行为,然后通过这种行为分析发掘:

  1. 向什么用户推荐新的商品;
  2. 什么样的商品会存在可能的问题,比如商品有可能需要下架,因为潜在用户对这种商品的评价差、转化率不好、复购率不好。

9.2.2 数据源

9.2.2.1 下载
从淘宝的天池下载数据源,地址为:https://tianchi.aliyun.com/dataset/dataDetail?dataId=46

需要使用自己的淘宝账号进行下载,有12256906条记录,此处不提供数据的下载,虽然说数据免费公开使用,可能涉及某些商业的版权信息,大家可以自己下载。

这个是淘宝发展初期数据,数据规模比较小,此外还包括了双12活动数据,通过双12的数据和前后数据的对比可以直观的看到活动对互联网平台产生的流量的影响。

9.2.2.2 数据格式

数据包括了这么几列,按照行存储,一行包含了用户ID、商品ID、商品类目ID、行为类型、时间戳。其中用户ID是序列化后的,即脱敏后的数据。

行为类型:包括PV、buy、cart、fav。

数据时间戳设定精度到小时,如2014年12月12号20点,没有带分秒即毫秒。

由于数据分析中需要加载数据分析的逻辑,比如你点击了一个页面,然后才会产生购买行为,而不是购买行为在点击页面之前发生。为了分析正常逻辑的顺序,需要严格的事件发生的时间戳进行排序,这个提供的数据无法排序,用户可能做很多动作,但是无法做一个时间上的排序。虽然按照个人理解,生成文本的行,谁先发生谁在前。

本次不做这个假定,做这个假定在数据分析里面会有一些困难,因为做大数据分析通过MapReduce,MapReduce不去取所在文本行顺序作为时间的先后顺序,因为做MapReduce之后行的信息丢失掉了,虽然可以把行的信息拿起来,但是一般我们不这么做。如在做Map时,读取文本信息,以行作为Key,以行的字符串作为Value,但是在做Map或Reduce操作时,会把行信息丢弃掉。

9.2.3 数据加载

加载数据到数据仓库,使用Hive,数据库仓库表的定义如下:

其中有个struct,由于create_time是用2014年12月12日20点表示,需要将这个数据切割开来,切割成create_day和create_hour,这种切割必然会带来需要界定符,行的界定符是英文的逗号,列的界定符是空格,而且txt文本第一行是header信息,需要把第一行信息去掉。

CREATE TABLE IF NOT EXISTS `TIANCHI` 
(  
  `user_id` bigint,  
  `item_id` bigint,  
  `behavior_type` bigint,  
  `user_geohash` string,  
  `item_category` bigint,  
  `create_time` struct<create_day:string, create_hour:string>
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ' ' tblproperties("skip.header.line.count"="1");

如数据:

用ssh登录app-11,切换到hadoop用户,并启动集群:./startAll.sh

由于Hive安装在app-12,用ssh连接app-12,并切换到hadoop用户。

创建目录:mkdir /tmp/spark。

上传文件tianchi_mobile_recommend_train_user.zip至/tmp/spark

在解压缩之前需要先安装unzip命令:yum install -y unzip zip

解压缩数据文件:unzip tianchi_mobile_recommend_train_user.zip

确保ssh终端为linux模式:

进入hive客户端:hive –service cli

进入数据库test:

show databases;
use test;

打开数据库创建文件:

创建表:以’,’为分隔符,跨过第一行。

导入数据:

LOAD DATA LOCAL INPATH '/tmp/spark/tianchi_mobile_recommend_train_user.csv' OVERWRITE INTO TABLE TIANCHI;

查看对应HDFS

验证加载过程是否正确:

设置map和reduce内容:

set mapreduce.map.memory.mb=1024;
set mapreduce.reduce.memory.mb=1024;

查询表的条数:

查询过程中启动Tez MapReduce的JOB,查询到的数据量为12256906。

数据有1200多万条,本次基于电脑内存有限只截取前120多万条。

做一个初步的分析,条数正确的情况下,查看数据包含的时间区间情况并排序:

select create_hour from (select distinct create_time.create_hour from tianchi)u order by u.create_hour;

可以看出,即使是淘宝初期,每个小时都是有成交量的。

再查询每天的情况:

select create_day from (select distinct create_time.create_day from tianchi)u order by u.create_day;

可以看出时间是从2014-11-18到2014-12-18,每天都有成交量。

接下来统计每天发生的数据量,操作预览、加购物车等行为的数据量。

select create_time.create_day,count(*) from tianchi group by create_time.create_day order by create_time.create_day;

可以看出双12当天起到了一定的作用,流量增长接近2倍,毕竟当时双12影响还是小。

关闭hive客户端。

9.2.4 用户行为分析

9.2.4.1 任务1- PV分布分析-日期

PV即用户访问流量分析

9.2.4.1.1介绍

如何反映双12流量的增长情况。

如何反应每一个时段用户的访问量,可以看出21:00-22:00用户访问量高。

Web登录Jupyter: app-11:8888

输入访问密码后进入:

新建一个python3的项目

重命名为:user-pv

9.2.4.1.2 上传文件

上传用户分析行为代码到/home/hadoop/.local/:

刷新web页面后就能看到上传的代码:

点击pageViewByDate.ipynb,进入

根据每一天情况分析总体流量。生成每天流量分析图。

为了查看日志日志信息,打开Jupyter日志:tailf /tmp/jupyter.log

接下来一步步执行,也可以自己在新建项目中输入并运行。

9.2.4.1.3 调度模式

在将调度模式前,以例子为引子进行讲解

9.2.4.1.3.1 实例

创建sparkSession,相当于客户端。

同时创建DataFrame,相当于视图,在视图之上就可以加载SQL语句查询。

注:本例直接用spark读对应文件,前面已经讲过了SQL进入Hive的查询,本次换一种方式对比。

为了使用pyspark,引入对应模块

import findspark
findspark.init()

导入其他需要用到的模块

from pyspark import SparkContext
from pyspark import SparkConf
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField
import numpy as np
import pandas as pd
import matplotlib.dates as mdate

设置路径,HDFS存储路径

warehouse_location = abspath('/user/hive/warehouse')

创建SparkSession,主节点为app-11,

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .master("spark://app-11:7077") \
    .getOrCreate()

按CSV格式读取源文件

df = spark.read.format("CSV").option("header","true").\
option("timestampFormat ","yyyy-MM-dd'T'HH").\
schema("user_id int,item_id int,behavior_type int,user_geohash string,item_category int,create_time string").\
load("/user/hive/warehouse/test.db/tianchi/tianchi_mobile_recommend_train_user.csv")

注:DataFrame是需要带格式的

创建临时视图,视图周期和SparkSession一致

df.createOrReplaceTempView("taobao")
9.2.4.1.3.2 四种模式

目前常用的主要有如下四中调度模式:

不启用集群的模式:local模式

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext(“local”, “First APP”)

Spark自带主从结构standalone,本例子一主三从。提交任务的端口是app-11:7077

import findspark
findspark.init()
from pyspark import SparkContext
sc=SparkContext(“spark://app-11:7077”,”First App”)

Yarn模式,不适合做交互式查询,该模式需要将文件通过submit提交给集群
mport findspark

findspark.init()
from pyspark import SparkContext
sc=SparkContext(“yarn-client”,”First App”)

Yarn集群模式,一般都是开发完成后,直接用来执行的,不适用于交互模式

9.2.4.1.4 画图工具包

继续下一步之前,介绍画图工具包。

Pandas是一个开源的数据处理工具,在数据分析里面,经常会用到,特别是小数据集上,因为Pandas的基本原理将数据加载到内存,然后做数据分析,由于将整个数据集加载到内存,会给内存带来很大的增长,所以Pandas不适合处理大数据集。但是Pandas的画图和数据处理等工具还是很好用的,而且Python也对其支持很好。

Matplotlib这个用来Python画图,可以画二维的,三维的都可以。

9.2.4.1.5 Apache Arrow

Apache Arrow是一种跨平台的数据交换格式框架,提供数据转换功能,需要画图、做某些简单的数据处理,需要将数据转换为Pandas等对象。Spark创建的对象DataFrame是Spark的对象,直接加载进Pandas操作函数会报错,需要有个工具将数据类型转换为Pandas识别的数据类型,展现处理结果是非常适用。

所以使用Spark集群做数据处理,使用Pandas展现。Apache Arrow就提供了这两个之间进行沟通的桥梁。

如图展示Apache Arrow所能做的事情

9.2.4.1.5.1 安装

上传安装文件到/tmp/spark目录:

安装:

pip install pyarrow-0.13.0-cp37-cp37m-manylinux1_x86_64.whl

程序启用pyarrow

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
9.2.4.1.6 惰性求值与缓存

按天查询流量信息:同时缓存查询结果,只做语法检查,并没有提交给集群执行。

Spark中有惰性求值和缓存两个概念,如下Spark SQL相当于转换操作,对数据进行了一次变化,由于还不需要用到变化的结果,就没必要把结果计算出来,只有当需要时,才将转换操作做一遍,符合DAG模式的大数据处理逻辑。相当于定义DAG,定义逻辑关系图,当我们没有使用或者说没有定义完DAG,不去将DAG提交给集群执行,执行动作需要触发点,即行动操作。

转换操作和执行操作带来的就是惰性求值,即没有明确使用结果的时候,并不提交给集群求值,这种特征就叫惰性求值。

9.2.4.1.7 查询并展示

通过视图计算每天量

sqlDF2 = spark.sql("SELECT date(to_timestamp(create_time)) as create_day, count(*) as pbcf FROM taobao group by date(to_timestamp(create_time)) order by date(to_timestamp(create_time))").cache()
sqlDF2.show()

重命名:date(to_timestamp(create_time)) as create_day

重命名:count() as pbcf

按天分类:group by date(to_timestamp(create_time))

按天排序:order by date(to_timestamp(create_time))

再做一次show操作,速度很快,说明数据已经被缓存到内存中了。

再创建一个视图,用SQL语句方式处理。

sqlDF2.createOrReplaceTempView("pbcf")

再次执行,执行很快出结果:

sqlDF = spark.sql("select cast(create_day as string)as create_day, pbcf from pbcf").cache()
sqlDF.show()

类型转换:cast(create_day as string)as create_day

将DataFrame转为Pandas可以识别的数据类型。有个警告可以忽略。

pbcfPandasDF = sqlDF.toPandas()
pbcfPandasDF

创建完成后就可以画图了

X轴时间,格式为create_day,时间间隔,显示是斜45度。执行完成后,结果图就出来了。

fig=plt.figure(figsize=(30,6))//宽30,高6
ax=fig.add_subplot(1,1,1)//行列数
ax.xaxis.set_major_formatter(mdate.DateFormatter('%Y-%m-%d'))// 标签文本的格式
date_series=pd.date_range(pbcfPandasDF['create_day'][0],pbcfPandasDF['create_day'][30],freq='D')//范围
plt.xticks(date_series,rotation=45)//X轴范围和角度
ax.plot(date_series,pbcfPandasDF.set_index('create_day'))
plt.show()
9.2.4.1.8 查看日志及应用

页面查看:app-11:8080

9.2.4.1.9 总结
  • 加载包,并处理化findspark,就可以使用pyspark了。
  • 创建sparkSession,基于Spark standalone集群。
  • 创建DataFrame,通过原始文件加载源数据,初始化DataFrame。
  • 加载并安装Apache Arrow,便于将DataFrame转换为Pandas,便于画图。
  • 通过Spark SQL语句创建另一个DataFrame,且将DataFrame缓存。
  • 为了将DataFrame转换为Pandas,需要将日期格式转换为string,便于Arrow支持。
  • 将结果画图并展现以及保存。
9.2.4.1.10 未退出处理

做完这些后,需要清理,如果不清理环境,直接关闭执行页面:

通过app-11:8080发现还在运行中。

在Running页面,显示一个是新建的,一个是资源提供的都在运行中。

点击关闭即可,否则会影响后续其他运行。提示资源不够。

关闭后再查看运行情况,显示没有在运行的

9.2.4.2 任务1- PV分布分析-小时

接下来统计,更加时间统计流量信息,可以直接用资源提提供的,也可以新建项目运行。

新建项目user-pv-hour

方式参考之前方式,只是将统计点由日期修改为小时了

导入需要的模块

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField
import numpy as np
import pandas as pd
import matplotlib.dates as mdate

创建SparkSession,导入CSV数据,创建视图

spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.master("spark://app-11:7077") \
.getOrCreate()
df = spark.read.format("CSV").option("header","true").\
option("timestampFormat ","yyyy-MM-dd'T'HH").\
schema("user_id int,item_id int,behavior_type int,user_geohash string,item_category int,create_time string").\
load("/user/hive/warehouse/test.db/tianchi/tianchi_mobile_recommend_train_user.csv")
df.createOrReplaceTempView("taobao")

启用pyarrow

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

根据视图统计数量

spark.sql("SELECT count(*) FROM taobao").show()

查询时间和销量对应数据,并缓存

sqlDF = spark.sql("SELECT hour(to_timestamp(create_time)) as create_hour, count() as pbcf FROM taobao group by hour(to_timestamp(create_time)) order by hour(to_timestamp(create_time))") sqlDF.cache() 

重命名:hour(to_timestamp(create_time)) as create_hour
重命名:count() as pbcf
按天分类:group by hour(to_timestamp(create_time))
按天排序:order by hour(to_timestamp(create_time))

显示查询的数据,可以看出晚上9-10点销量最高

将DataFrame格式数据转换为Pandas,适合画图

pbcfPandasDF = sqlDF.toPandas()

画图

fig=plt.figure(figsize=(30,6))//大小
ax=fig.add_subplot(1,1,1)//行列数
date_series=pbcfPandasDF['create_hour']//数据区间
plt.xticks(date_series)//X轴数据
ax.plot(date_series,pbcfPandasDF.set_index('create_hour'))
plt.show()"

最后记得要清空

spark.catalog.clearCache()

关闭spark,结束整个sparkSession。

spark.stop()

通过app-11:8080查看程序关闭了

在app-11:8888 Running页面手动点击关闭

9.2.4.3 任务2- DAU分析
9.2.4.3.1 介绍

分析结果展示整个数据集在11-18到12-18区间,每一天用户的活跃情况。

9.2.4.3.2 上传文件

上传文件资源DailyActiveUsers.ipynb到/home/spark/.local

9.2.4.3.3 分析

从app-11:8888页面查看,此处也可以新建user-dau项目

打开文件DailyActiveUsers.ipynb或者新建的user-dau文件

导入需要的模块

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField
import numpy as np
import pandas as pd
import matplotlib.dates as mdate

新建SparkSession,加载源文件并创建视图

warehouse_location = abspath('/user/hive/warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.master("spark://app-11:7077") \
.getOrCreate()
df = spark.read.format("CSV").option("header","true").\
option("timestampFormat ","yyyy-MM-dd'T'HH").\
schema("user_id int,item_id int,behavior_type int,user_geohash string,item_category int,create_time string").\
load("/user/hive/warehouse/test.db/tianchi/tianchi_mobile_recommend_train_user.csv")
df.createOrReplaceTempView("taobao")

启用pyarrow

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

创建DataFrame,因为一个用户可能呈现了多次的访问,想看两个数据,一个是去重之后的用户量,一个是没有去重的用户访问累加情况。通过两个数据对比,得出某些结论。

统计去重和未去重情况下的用户量

sqlDF = spark.sql("SELECT date(to_timestamp(create_time)) as create_day, count(distinct user_id) as pbcfd, count(user_id) as pbcfa FROM taobao group by date(to_timestamp(create_time)) order by date(to_timestamp(create_time))")

重命名:date(to_timestamp(create_time)) as create_day,
去重重命名:count(distinct user_id) as pbcfd,
不去重重命名:count(user_id) as pbcfa
按日期分类:group by date(to_timestamp(create_time))
按日期排序:order by date(to_timestamp(create_time))

sqlDF.cache()
sqlDF.show()

转换为pandas可接受的类型

sqlDF.createOrReplaceTempView("pbcf")
sqlPandasDF = spark.sql("select cast(create_day as string)as create_day, pbcfd, pbcfa from pbcf")

类型转换:cast(create_day as string)as create_day

pbcfPandasDF = sqlPandasDF.toPandas()

展示:蓝色是去重了的,橘红色是没有去重的

对比显而易见,蓝色基本可以忽略不计。每天的活跃用户太少,可以看出淘宝发展的初期用户量少。

pbcfPandasDF.set_index('create_day').plot(kind='bar')

用折线图显示去重的

fig=plt.figure(figsize=(30,6))
ax=fig.add_subplot(1,1,1)
ax.xaxis.set_major_formatter(mdate.DateFormatter('%Y-%m-%d'))
date_series=pd.date_range(pbcfPandasDF['create_day'][0],pbcfPandasDF['create_day'][30],freq='D')
plt.xticks(date_series,rotation=45)
ax.plot(date_series,pbcfPandasDF.drop(['pbcfa'], axis=1).set_index('create_day'))
plt.show()

清理,关闭

spark.catalog.clearCache()
spark.stop()

再从Running页面进行关闭

结论:做数据分析并不是一拍脑袋就干的,是要根据数据情况结果进行说话,现有规划,从业务考虑做DAU是有用的,但是从实际数据看没有什么价值,因为数据用户量太小了,只能做这种非去重的图标。

9.2.4.4 任务3-客单量分析
9.2.4.4.1 介绍

反映客户的购买潜力和购买能力,平台非常关注的重要指标,只有更多人在平台购物,平台才能够发展和壮大,生成了每一天所有顾客的订单数量。从图可以看出双12确实有活动的效果,

9.2.4.4.2 上传文件

上传文件perBuy.ipynb到/home/hadoop/.local

9.2.4.4.3 分析

页面app-11:8888查看:

打开perBuy.ipynb,此处就不再新建了

导入模块

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField
import numpy as np
import pandas as pd
import matplotlib.dates as mdate
from pyspark.sql.functions import udf, col

设置HDFS位置

warehouse_location = abspath('/user/hive/warehouse')

新建SparkSession,导入源文件,并创建视图

spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.master("spark://app-11:7077") \
.getOrCreate()
df = spark.read.format("CSV").option("header","true").\
option("timestampFormat ","yyyy-MM-dd'T'HH").\
schema("user_id int,item_id int,behavior_type int,user_geohash string,item_category int,create_time string").\
load("/user/hive/warehouse/test.db/tianchi/tianchi_mobile_recommend_train_user.csv")
df.createOrReplaceTempView("taobao")

启动pyarrow

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

查询

sqlDF = spark.sql("SELECT date(to_timestamp(create_time)) as create_day,count(distinct user_id) as pbcfd,count(behavior_type) as buy FROM taobao where behavior_type=4 group by date(to_timestamp(create_time)) order by date(to_timestamp(create_time))")
sqlDF.cache()

重命名:date(to_timestamp(create_time)) as create_day,
去重重命名:count(distinct user_id) as pbcfd,
操作类型数重命名:count(behavior_type) as buy
按日期分类:group by date(to_timestamp(create_time))
按日期排序:order by date(to_timestamp(create_time))

sqlDF.show(30)

转换类型及格式并

结果反映了客单量,基本在1单多一点,双12爆发,但是也不到1.5单,客单量还不是那么高,当然现在的客单量也可能不高,因为现在是用户基数大了。

展示

fig=plt.figure(figsize=(30,6))
ax=fig.add_subplot(1,1,1)
ax.xaxis.set_major_formatter(mdate.DateFormatter('%Y-%m-%d'))
date_series=pd.date_range(pbcfPandasDF['create_day'][0],pbcfPandasDF['create_day'][30],freq='D')
plt.xticks(date_series,rotation=45)
pbcfPandasDF['perBuy'] = pbcfPandasDF['buy'] / pbcfPandasDF['pbcfd']//买数/去重后的用户数
ax.plot(date_series,pbcfPandasDF.drop(['pbcfd','buy'], axis=1).set_index('create_day'))
plt.show()

清理,关闭

spark.catalog.clearCache()
spark.stop()

再从Running页面进行关闭

9.2.5 商品分析

9.2.5.1 任务1-商品PV各环节转化率
9.2.5.1.1 介绍

计算商品各环节的转化率,包页面点击转化为收藏转化率,由收藏转化为购物车的转化率,由购物车转化为购买转化率有多少,整个页面点击到最终的购买转化率有多少。

数据分析里面称之为漏斗,漏斗分析比较常用,体现了营销最终效果,从商家看,虽然有很多访问,但是没有产生最终效果的访问时无效的,从转化率来看也是非常小,不到2%。投入了100分广告,只有2分广告才会产生效果,这样就可以在广告投入和收益之间做一个权衡,而不是将所有公司的整个宝都押在广告上。

为了画漏斗图,需要另外一个框架pyecharts,这个框架可以直接安装,过程很长,资源里面有自动安装脚本,可以直接运行脚本进行安装在python环境中。

9.2.5.1.2 安装pyecharts

资源文件pyecharts.zip上传到/tmp/spark中。

解压缩:unzip pyecharts.zip

进入目录:pyecharts

赋予脚本执行权限:chmod a+x installPyecharts.sh

执行脚本:./installPyecharts.sh

脚本执行了一序列操作:

#!/bin/sh
	pip install lml-0.0.2-py2.py3-none-any.whl
	pip install dukpy-0.2.2-cp37-cp37m-manylinux1_x86_64.whl
	pip install macropy3-1.1.0b2.tar.gz
	pip install javascripthon-0.10.tar.gz
	pip install pyecharts_javascripthon-0.0.6-py2.py3-none-any.whl
	pip install jupyter-echarts-pypkg-0.1.2.tar.gz
	unzip pyecharts-0.5.10.zip
	cd pyecharts-0.5.10
	python setup.py install
	cd ..
	pip install pyecharts-snapshot-0.1.10.zip
	
	# 如果没有安装成功,则以下命令会出现错误,否则没有输出
	python -c "from pyecharts import Funnel"
9.2.5.1.3 上传文件

上传文件ratio.ipynb到/home/haoop/.local目录

9.2.5.1.4 分析

通过app-11:8888查看

打开文件ratio.ipynb,此处不在新建用现有的资源文件

导入模块

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField
import numpy as np
import pandas as pd
import matplotlib.dates as mdate
from pyspark.sql.functions import udf, col

设置HDFS位置

warehouse_location = abspath('/user/hive/warehouse')

新建SparkSession,导入源文件,并创建视图

spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.master("spark://app-11:7077") \
.getOrCreate()
df = spark.read.format("CSV").option("header","true").\
option("timestampFormat ","yyyy-MM-dd'T'HH").\
schema("user_id int,item_id int,behavior_type int,user_geohash string,item_category int,create_time string").\
load("/user/hive/warehouse/test.db/tianchi/tianchi_mobile_recommend_train_user.csv")
df.createOrReplaceTempView("taobao")

启动pyarrow
spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

统计商品的品类

spark.sql("SELECT count(distinct item_category) from taobao").show()

统计有多少种商品

spark.sql("SELECT count(distinct item_id) from taobao").show()

统计哪些商品比较突出,哪些比较差,将每一类商品购买数量打印出来。根据数据统计量排序。

spark.sql("SELECT item_id,behavior_type,count(*) as cnt from taobao group by item_id,behavior_type order by behavior_type desc,cnt desc").show()

group by item_id,behavior_type //商品及行为一起统计
order by behavior_type desc,cnt desc//降序排序

也就说最好的商品,30天卖出去了7个。

进一步分析卖得最多的商品,万一是一个用户一次购买了50个。这个数据总量还是小,大数据得出的结论更能作为决策的依据。

spark.sql("SELECT count(*) as cnt from taobao where item_id=303205878 and behavior_type=4").show()

分析转化率,时间戳是日期加小时,并不能够很好的理清工作发生的逻辑顺序,只能通过总体统计数据间的转化情况,通常能理解现有pv、然后才有fav等,相当于这个转化率是有意义的,如果没有pv,有fav,不太符合逻辑。统计算总体转化率。

查询商品及对应的动作。

spark.sql("""SELECT item_id,
(case behavior_type
when 1 then 'pv'
when 2 then 'fav'
when 3 then 'cart'
when 4 then 'buy'
END)b
FROM taobao""").show()

将相关动作的数据查询出来,一步步分析。

spark.sql("""SELECT item_id,
(case behavior_type
when 1 then 'pv'
when 2 then 'fav'
when 3 then 'cart'
when 4 then 'buy'
END)behavior,
count(*) as cnt
FROM taobao group by item_id,behavior_type order by behavior_type, cnt desc
""").show()

保存为DataFrame:

sqlDFRaw = spark.sql("""SELECT item_id,
(case behavior_type
when 1 then 'pv'
when 2 then 'fav'
when 3 then 'cart'
when 4 then 'buy'
END)behavior,
count(*) as cnt
FROM taobao group by item_id,behavior_type order by behavior_type, cnt desc
""")

然后再将行变列,将对应id同一个物品输出多个列,每列相应动作数。

sqlDF = sqlDFRaw.groupBy("item_id").pivot("behavior", ["pv", "fav", "cart", "buy"]).sum("cnt")
sqlDF.cache()
sqlDF.show()

为了验证上面生成数据的可靠性,做一次查询,确保数据可靠校验。

spark.sql("SELECT * FROM taobao where item_id=37274339").show()

查询null的可靠性

sqlDF.where(sqlDF.pv.isNull()).show()

清空上述分析过程

spark.catalog.clearCache()

看单个商品的Pv能否统计出来,

sqlDFRaw = spark.sql("""SELECT item_id,
(case behavior_type
when 1 then 'pv'
when 2 then 'fav'
when 3 then 'cart'
when 4 then 'buy'
END)behavior,
count(*) as cnt
FROM taobao group by item_id,behavior_type order by behavior_type, cnt desc
""")
sqlDFCol = sqlDFRaw.groupBy("item_id").pivot("behavior", ["pv", "fav", "cart", "buy"]).sum("cnt")
sqlDFCol.cache()
sqlDFCol.show()

从页面点击到最终购买转化率,该SQL语句是基于前面的分析过程写的。

sqlDFCol.createOrReplaceTempView("pbcf")
sqlDF = spark.sql("SELECT item_id, buy/pv as frac from pbcf where pv is not null and buy is not null order by frac desc")
sqlDF.show()

对最高的数据做进一步分析,确认真实可靠,可以看出只做了一次点击,购买了3次。

spark.sql("SELECT * FROM pbcf where item_id=307799636").show()

继续查看商品原始详情,同一个用户id,在同一个时间点购买同一个商品,从而得出该数据并不能很好的反映转化率情况。

spark.sql("SELECT * FROM taobao where item_id=307799636").show()

需要从整体分析转化率,分析单个商品的转化率从这个数据集上没法达到,数据集太小,需要总体分析。

现在开始总量的分析。

查询各个动作的总量。

sqlDF = spark.sql("SELECT count(pv) as cpv, count(cart) as ccart, count(fav) as cfav, count(buy) as cbuy from pbcf")
sqlDF.show()

将数据转换成pandasDataFrame,并打印。

from pyecharts import Funnel
pbcfPandasDF = sqlDF.toPandas()
pbcfPandasDF

画出整个过程转化率图,按照转化率从高向低排列

attrs = pd.Series(['pv','cart','fav','buy']).tolist()
attr_value = pd.Series([100,pbcfPandasDF.iloc[0]['ccart']/pbcfPandasDF.iloc[0]['cpv']100,pbcfPandasDF.iloc[0]['cfav']/pbcfPandasDF.iloc[0]['ccart']100,pbcfPandasDF.iloc[0]['cbuy']/pbcfPandasDF.iloc[0]['ccart']*100]).tolist()

funnel1 = Funnel("总体转化漏斗图一",width=800, height=400, title_pos='center')

funnel1.add(name="商品交易行环节", # 指定图例名称
attr=attrs, # 指定属性名称
value = attr_value, # 指定属性所对应的值
is_label_show=True, # 指定标签是否显示
label_formatter='{c}%', # 指定标签显示的格式
label_pos="inside", # 指定标签的位置
legend_orient='vertical', # 指定图例的方向
legend_pos='left', # 指定图例的位置
is_legend_show=True) # 指定图例是否显示

funnel1.render()
funnel1

清理,关闭

spark.catalog.clearCache()
spark.stop()

再从Running页面进行关闭

发表回复