欢迎来到广州华商学院大数据系DModel实训平台,

基于Flink的奥运会数据分析

责任编辑:bradley   发布时间:2022-05-16 16:43:52   

本实验针对奥运会数据进行分析,使用scala语言进行flink编程来处理数据,最后结果使用python进行可视化

一、实验环境

(1)Windows10
(2)Java:1.8
(3)Scala:2.11.12
(4)Python:3.x
(5)Python包:Matplotlib、Pandas
(6)所需工具:Maven、IntelloJ IDEA、Jupyter Notebook

二、项目创建说明

1.安装maven,在终端输入如下命令创建scala版flink项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.13.0 -DgroupId=org.apache.flink.quickstart -DartifactId=flink-scala-project -Dversion=0.1 -Dpackage=org.apache.flink.quickstart -DinteractiveMode=false 

2.打开IntelliJ IDEA配置项目的环境

(1)添加相关依赖包

打开第1步所创建的pom.xml文件

选择open as project,然后在pom.xml中添加如下内容(table API和csv格式相关的依赖包)并更新


 点击 pom.xml ,查看代码

(2)设置好与pom.xml中相对应的java版本和scala版本

点击file,然后点击project structure

在poject SDK中选择JDK1.8

在module SDK中选择JDK1.8

在SDKs中选择JDK1.8

在global libraries中选择scala-sdk-2.11.12

(3)创建scala文件,并配置运行环境

新建一个名为”Homework”的scala文件,在这个scala文件内编写代码,最后运行代码,注意在运行代码时需要进行下面所示的一些运行配置

在右上角中点击edit configurations

在modify options中勾选include dependencies with “provided” scope并保存,接下来就可以直接运行所写的scala代码了

三、数据集说明

本次作业所使用的数据集为olympic-athletes.csv文件(从百度网盘下载数据集,提取码:ziyu),数据集中包含了2000年到2012年一部分(一共153名)选手的信息。
文件内每行的内容及类型如下:
第1列为运动员名字,格式为String;
第2列为国家,格式为String
第3列为年份,格式为Int
第4列为运动项目,格式为String
第5列为获得的金牌数,格式为Int
第6列为获得的银牌数,格式为Int
第7列为获得的铜牌数,格式为Int
第8列为总共获得的奖牌数,格式为Int
部分数据如下图所示

四、步骤概述

首先在IDEA中使用scala利用flink对数据集进行处理,统计每个国家的总人数,每个运动项目的总人数,每个国家所获得的的总奖牌数,并分别输出数据到3个csv文件people_per_country.csv,people_per_Game.csv,total_per_country.csv。然后在jupyter notebook中使用python读取这3个csv文件并进行可视化。

五、代码详解

实验完整工程文件从百度云盘获取(提取码:ziyu),下面分块介绍具体代码作用。

1、scala代码数据处理(位于文件夹内的flink-scala-project\src\main\scala\org\apache\flink\quickstart\Homework.scala中)

导入相关的包

import org.apache.flink.api.common.functions.AggregateFunction

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.table.api.DataTypes

import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

import org.apache.flink.util.Collector

import java.util.Calendar


在数据处理过程中使用了flink提供的datastream API和tableAPI,因为需要进行批处理但是使用了datastream API,因此需要使用窗口window的概念将流处理转化为批处理(如果不使用window,则每接收到一个数据就需要进行处理并得到一个输出;当使用了window,则可以对这个窗口内的所有数据进行处理后再得到一个输出)。Window可以使用时间窗口和计数窗口,这里采用时间窗口进行数据处理,所用的时间又分为processing时间和event时间,这里采用了event时间进行处理。当使用event时间时,还需要用到watermark的概念。因此创建steam环境和table环境的代码如下(在steam环境中设置event时间,并每1秒发送一个watermark):


val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.getConfig.setAutoWatermarkInterval(1000L)

val tEnv = StreamTableEnvironment.create(env)

接下来使用table API来读取olympic-athletes.csv文件,并设置每列数据的名称以便于后续进行数据处理,读取文件得到的表数据取名为”my_table”,注意运行代码时文件路径需要进行修改:

val schema = new Schema()

  .field("player", DataTypes.STRING())

  .field("Country", DataTypes.STRING())

  .field("Year", DataTypes.INT())

  .field("Game", DataTypes.STRING())

  .field("Gold", DataTypes.INT())

  .field("Silver", DataTypes.INT())

  .field("Bronze", DataTypes.INT())

  .field("Total", DataTypes.INT())

tEnv.connect(new FileSystem().path("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\olympic-athletes.csv"))

  .withFormat(new Csv().deriveSchema())

  .withSchema(schema)

  .createTemporaryTable("my_table")

val table = tEnv.from("my_table")

在得到表数据table后,需要将table转换为DataStream类型,然后就可以使用flink提供的datastream API进行数据处理,为了方便处理,首先定义一个case class

case class Athlete(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int)

然后将table数据转换为DataStream类型数据

val athletes = tEnv.toAppendStream[Athlete](table)

上面得到的athletes的类型为DataStream[Athlete],由于后面时间窗口使用的event时间需要数据源提供时间戳,为了方便再建立一个时间戳属性的case class:

case class AthleteWithTime(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int, Time:Long)

接下来是构建一个带有时间戳的数据流:

val athletesWithTime= athletes.map(player=>new AthleteWithTime(player.player, player.Country,player.Years,player.Game,player.Gold,player.Silver,player.Bronze,player.Total,Calendar.getInstance.getTimeInMillis)).assignTimestampsAndWatermarks(new TimeAssigner)

在上面的代码中,最后给带有时间戳的数据流定义了(定义在TimeAssigner里)如何从数据流中提取时间戳,以及watermark的生成方式, TimeAssigner的具体代码如下:

class TimeAssigner

  extends BoundedOutOfOrdernessTimestampExtractor[AthleteWithTime](Time.seconds(5)) {

  override def extractTimestamp(r: AthleteWithTime): Long = r.Time

}

上面代码中使用了flink提供的BoundedOutOfOrdernessTimestampExtractor,这可以使得乱序的数据和稍微延迟(5秒)的数据得到处理。在得到数据源后,就可以对数据进行处理了,首先统计每个国家的总人数:

val peoplePerCountry = athletesWithTime

  .map(player => (player.Country,1))

  .keyBy(0)

  .timeWindow(Time.seconds(5))

  .sum(1)


上面代码中使用了时间窗口的概念,将流处理转为批处理,时间窗口的大小决定了所能处理的批的大小,为了将整个数据集都得到批处理,这里将时间窗口的大小设置得足够大(5秒)。然后是统计每个运动项目的总人数,这里采用reduce函数实现sum的功能(reduce函数的输入为两个相同类型的数据,输出为一个相同类型的数据,因此利用reduce可以实现一些较简单的数据滚动处理比如sum):

val peoplePerGame = athletesWithTime

  .map(player => (player.Game,1))

  .keyBy(0)

  .timeWindow(Time.seconds(5))

  .reduce((a,b)=>(a._1, a._2 + b._2))

最后是统计每个国家获得的总奖牌数,这里采用aggregate来实现sum的功能(aggregate函数是reduce的推广,它的输出类型可以和输入类型不相同,因此可以利用aggregate实现比较复杂的数据滚动处理):

val totalPerCountry = athletesWithTime

  .map(player => (player.Country, player.Total))

  .keyBy(0)

  .timeWindow(Time.seconds(5))

  .aggregate(new MySum)

上面代码中的MySum首先需要定义一个存放中间处理结果的accumulator,然后再定义由输入数据得到accumulator的方式,最后从最终的accumulator提取出完整的结果,完整定义如下所示(由于只有session时间窗口才需要实现merge函数,而这里使用的窗口是tumbling时间窗口,所以merge函数只要返回null即可):


class MySum

  extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {

  override def createAccumulator() = {

    ("", 0)

  }

  override def add(in: (String, Int), acc: (String, Int)) = {

    (in._1, in._2 + acc._2)

  }

  override def getResult(acc: (String, Int)) = {

    (acc._1, acc._2)

  }

  override def merge(acc1: (String, Int), acc2: (String, Int)) = {

    null

  }

}

在得到处理完的流数据后,将这些流数据输出到csv格式中去(注意运行代码时文件路径需要进行修改),由于flink是并行处理,所以最后输出到一个csv文件时需要设置并行度为1,这里注意因为在作业文件夹中已经存在这三个csv文件,因此重新运行时需要先删除原来的csv文件,再运行代码得到这三个csv文件:

peoplePerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_country.csv").setParallelism(1)

peoplePerGame.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_Game.csv").setParallelism(1)

totalPerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\total_per_country.csv").setParallelism(1)

env.execute()


这里将上面所述的完整scala代码进行展示:


import org.apache.flink.api.common.functions.AggregateFunction

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.table.api.DataTypes

import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

import org.apache.flink.util.Collector

 

import java.util.Calendar

 

object Homework {

  def main(args: Array[String]) {

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.getConfig.setAutoWatermarkInterval(1000L)

    val tEnv = StreamTableEnvironment.create(env)

    val schema = new Schema()

      .field("player", DataTypes.STRING())

      .field("Country", DataTypes.STRING())

      .field("Year", DataTypes.INT())

      .field("Game", DataTypes.STRING())

      .field("Gold", DataTypes.INT())

      .field("Silver", DataTypes.INT())

      .field("Bronze", DataTypes.INT())

      .field("Total", DataTypes.INT())

    tEnv.connect(new FileSystem().path("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\olympic-athletes.csv"))

      .withFormat(new Csv().deriveSchema())

      .withSchema(schema)

      .createTemporaryTable("my_table")

 

    val table = tEnv.from("my_table")

    val athletes = tEnv.toAppendStream[Athlete](table)

    val athletesWithTime= athletes.map(player=>new AthleteWithTime(player.player, player.Country,player.Years,

      player.Game,player.Gold,player.Silver,player.Bronze,player.Total,Calendar.getInstance.getTimeInMillis))

      .assignTimestampsAndWatermarks(new TimeAssigner)

 

    val peoplePerCountry = athletesWithTime

      .map(player => (player.Country,1))

      .keyBy(0)

      .timeWindow(Time.seconds(5))

      .sum(1)

 

    val peoplePerGame = athletesWithTime

      .map(player => (player.Game,1))

      .keyBy(0)

      .timeWindow(Time.seconds(5))

      .reduce((a,b)=>(a._1, a._2 + b._2))

 

    val totalPerCountry = athletesWithTime

      .map(player => (player.Country, player.Total))

      .keyBy(0)

      .timeWindow(Time.seconds(5))

      .aggregate(new MySum)

 

    peoplePerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_country.csv").setParallelism(1)

    peoplePerGame.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_Game.csv").setParallelism(1)

    totalPerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\total_per_country.csv").setParallelism(1)

    env.execute()

 

  }

}

 

case class Athlete(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int)

case class AthleteWithTime(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int, Time:Long)

 

class TimeAssigner

  extends BoundedOutOfOrdernessTimestampExtractor[AthleteWithTime](Time.seconds(5)) {

 

  override def extractTimestamp(r: AthleteWithTime): Long = r.Time

 

}

 

class MySum

  extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {

  override def createAccumulator() = {

    ("", 0)

  }

  override def add(in: (String, Int), acc: (String, Int)) = {

    (in._1, in._2 + acc._2)

  }

  override def getResult(acc: (String, Int)) = {

    (acc._1, acc._2)

  }

  override def merge(acc1: (String, Int), acc2: (String, Int)) = {

    null

  }

}


2、python代码可视化(位于文件夹的” python代码可视化.ipynb”中)

在处理完数据并得到相应的csv文件后,可以打开jupyter notebook,写入python代码读取相关的csv文件并进行可视化。在终端输入jupyter-lab打开jupyter notebook。
首先导入相关的包,并读取csv文件:

import pandas as pd

import matplotlib.pyplot as plt

#当python文件和csv文件路径不同时要修改运行路径

people_per_country = pd.read_csv("people_per_country.csv", header=None, names=["country", "people"], index_col=0)

people_per_game = pd.read_csv("people_per_game.csv", header=None, names=["game", "people"], index_col=0)

total_per_country = pd.read_csv("total_per_country.csv", header=None, names=["country", "total"], index_col=0)


(1)画出每个国家总人数条形图和饼图:

people_per_country.plot.bar()

people_per_country.plot.pie(subplots=True, figsize=(15,15),rotatelabels=True,labeldistance=0.8,radius=0.9,fontsize=11)

(2)画出每个运动项目参加总人数的条形图和饼图:

people_per_game.plot.bar()

people_per_game.plot.pie(subplots=True, figsize=(15,15),rotatelabels=True,labeldistance=0.8,radius=0.5,fontsize=20)

(3)画出每个国家获得奖牌总数的条形图和饼图:

total_per_country.plot.bar()

total_per_country.plot.pie(subplots=True, figsize=(17,17),rotatelabels=True,labeldistance=0.8,radius=0.9,fontsize=10)


☆ 《Flink原理与应用》课程空间