欢迎来到广州华商学院大数据系DModel实训平台,

基于Flink的美国县域信息分析

责任编辑:bradley   发布时间:2022-05-18 16:46:58   

本实验针对美国县域信息数据进行分析,使用scala语言进行flink编程来处理数据,最后结果使用python进行可视化

本实验涉及到的所有数据集和代码,可以从百度网盘下载。提取码:ziyu。

一、 实验环境

  • Windows 10 家庭中文版

  • IntelliJ IDEA 2019.3(Ultimate Edition)

  • apache-maven-3.8.1

  • python 3.8.7

  • flink 1.11.2

二、 数据集

美国县域信息(US County information),总共有3094行信息。每行有6个属性字段。如下所示:
– County:指美国的郡县名,比如有Autauga(奥陶加县)、Baldwin(鲍德温县)等.
– State:指对应郡县所属的州,比如有Alabama(亚拉巴马州)、Alaska(阿拉斯加州)等。
– FIPS Code:联邦信息处理标准(FIPS)现称为联邦信息处理系列,是由美国国家标准与技术研究院(NIST)指定的数字代码。
– Population:记录当前郡县人口数量。
– Area:记录当前郡县的土地面积。单位为平方英里。
– Density:记录当前郡县的人口密度。可通过人口数量与土地面积计算得出。单位为人/每平方英里。
数据具体样式如下所示:

下载地址:https://github.com/balsama/us_counties_data

三、使用flink进行数据处理与分析

为了方便数据的存储,首先定义一个案例类存储数据集中的数据。

//根据表格内容 定义一个POJOs存取内容

//字段包括county:郡县名、state:州名、FIPS Code:区域编码、population:人口数量、area:面积、Density:人口密度

case class countyInfo(county:String,state:String,FIPS:String,

population:Int,area:Int,Density:Int)

1.计算某一州面积最大的郡县

(1)流程图如下所示(以Alaska州为例)

由于只计算Alaska州各郡县面积中的最大者,首先我们要对数据进行过滤,我们使用filter算子,只保留state为Alaska的数据。接着在使用map算子去掉多余的字段(比如FIPS Code、population等)。然后使用maxBy算子指定面积字段得出面积最大者。最后再次使用map算子得出郡县名字段。
在本例中Alaska州面积最大的郡县为Unorganized Borough。与下图维基百科所查一致。


(2)代码如下

//求一个州里面积最大的郡县

def getMaxAreaCountyInState(StateName:String,inputPath:String) = {

  // 设置批执行环境

  val env = ExecutionEnvironment.getExecutionEnvironment

 

  // 得到输入数据

  val input = env.readCsvFile[countyInfo](inputPath,ignoreFirstLine = true)//从CSV中读取数据,并忽略第一行属性字段

 

  // 统计该州面积最大的郡

  val county = input.filter( _.state == StateName)//过滤数据,只留下该州的信息

      .map(x => (x.area,x.county,x.state))//保留有用字段

      .maxBy(0)//取出面积最大的元组

      .map(x => x._2)//只保留名称字段

 

  // 执行并输出结果

  println(StateName+"面积最大的县是")

  county.print()

 

2.统计各州总人数、总面积、人口密度

(1)流程图如下所示

由于要统计的数据为各州的总人数、总面积及人口密度。首先我们先使用map算子去掉一些无用的字段。接着使用groupBy算子指定以州字段进行分组。然后再使用reduce算子计算州的总人数和总面积。最后使用map算子额外映射出一个人口密度字段,值为总人数除以总面积,单位为人/每平方英里,保留两位小数。
(2)代码如下

//统计州信息

def getStateInformation(inPath:String,outPath:String)={

  // 设置批执行环境

  val env = ExecutionEnvironment.getExecutionEnvironment

 

  // 得到输入数据

 val input = env.readCsvFile[countyInfo](inPath,ignoreFirstLine = true)//从CSV中读取数据,并忽略第一行属性字段

 

  // 统计每州的数据

  val state = input.map(x => (x.population,x.area,x.state)) //去掉一些无用的字段,比如FIPS Code

      .groupBy(2)                                   //按州来聚合数据

      .reduce((x,y)=>(x._1+y._1,x._2+y._2,x._3))            //计算每州的总人口,和总面积

      .map((x)=>(x._1,x._2,(x._1.asInstanceOf[Float]/x._2.asInstanceOf[Float]).formatted("%.2f"),x._3))//计算每州的人口密度保留两位小数

      .setParallelism(1)

 

  // 执行并输出结果

  state.writeAsCsv(outPath)

  state.print()

}

}

3.计算统计数据Top-10的州

(1)流程图如下所示(以统计面积前10的州为例)

由于要计算面积前10的州,可以使用3.2节得出的结果文件state.csv作为输入,首先使用map算子取出我们需要统计的字段(面积、人口或者人口密度)及州名,接着使用sortPartition算子并指定排序字段及排序方式进行排序。最后使用first(n)算子取出前n条数据。
(2)代码如下

//计算统计数据top10的州

def getStateTop10(relativePath: String) = {

  // 设置批执行环境

  val env = ExecutionEnvironment.getExecutionEnvironment

  // 得到输入数据

  val input:DataSet[(Int,Int,Float,String)] =   env.readCsvFile(relativePath+"state.csv")

 

  //计算每种数据的top10

  //1.计算人口总数top10

  val population = input.map((x)=>(x._1,x._4)) //取出字段

    .sortPartition(0,Order.DESCENDING)  //按人数降序排序

    .setParallelism(1)

    .first(10)  //取排序后前10的州

  //2.计算面积top10

  val area =  input.map((x)=>(x._2,x._4)) //取出字段

    .sortPartition(0,Order.DESCENDING)  //按面积降序排序

    .setParallelism(1)

    .first(10)  //取排序后前10的州

  //3.计算人口密度top10

  val density =  input.map((x)=>(x._3,x._4)) //取出字段

    .sortPartition(0,Order.DESCENDING)  //按人口密度降序排序

    .setParallelism(1)

    .first(10)  //取排序后前10的州

 

  // 执行并输出结果

  population.writeAsCsv(relativePath+"populationTop10.csv")

  population.print()

  area.writeAsCsv(relativePath+"areaTop10.csv")

  area.print()

  density.writeAsCsv(relativePath+"densityTop10.csv")

  density.print()

}

 

四、数据可视化

1.美国人口密度散点图

(1)代码如下

def drawUsDensityMap(inUrl,outUrl):

    #各州对应缩写的字典

    state_dict = {"Alabama":"AL","Alaska":"AK","Arizona":"AZ","Arkansas":"AR","California":"CA",

                  "Colorado":"CO","Connecticut":"CT","Delaware":"DE","Florida":"FL","Georgia":"GA",

                  "Hawaii":"HI","Idaho":"ID","Illinois":"IL","Indiana":"IN","Iowa":"IA",

                  "Kansas":"KS","Kentucky":"KY","Louisiana":"LA","Maine":"ME","Maryland":"MD",

                  "Massachusetts":"MA","Michigan":"MI","Minnesota":"MN","Mississippi":"MS","Missouri":"MO",

                  "Montana":"MT","Nebraska":"NE","Nevada":"NV","New Hampshire":"NH","New Jersey":"NJ",

                  "New Mexico":"NM","New York":"NY","North Carolina":"NC","North Dakota":"ND","Ohio":"OH",

                  "Oklahoma":"OK","Oregon":"OR","Pennsylvania":"PA","Rhode Island":"R","South Carolina":"SC",

                  "South Dakota":"SD","Tennessee":"TN","Texas":"TX","Utah":"UT","Vermont":"VT",

                  "Virginia":"VA","Washington":"WA","West Virginia":"WV","Wisconsin":"WI","Wyoming":"WY"

                  }

    try:

        register_url("https://echarts-maps.github.io/echarts-countries-js/")

    except Exception:

        import ssl

        ssl._create_default_https_context = ssl._create_unverified_context

        register_url("https://echarts-maps.github.io/echarts-countries-js/")

    df = pd.read_csv(inUrl,names=['population','area','density','state'])#从文件中读取数据

    density = df['density'].tolist()                             

    state = df['state'].tolist()

    #全称和缩写的映射

    for i in range(len(state)):

        state[i] = state_dict[state[i]]

    list = [[state[i],density[i]] for i in range(len(state))]  # 合并两个list为一个list

    maxDensity = max(density)                                         # 计算最大密度,用作图例的上限

 

    geo = (                                                          # 添加坐标点

        Geo(init_opts=opts.InitOpts(width = "1200px", height = "600px", bg_color = '#EEEEE8'))

            .add_schema(maptype="美国",itemstyle_opts=opts.ItemStyleOpts(color="#323c48", border_color="#111"))

            .add_coordinate('WA',-120.04,47.56).add_coordinate('OR',-120.37,43.77).add_coordinate('CA',-120.44,36.44).add_coordinate('AK',-122.00,28.46)

            .add_coordinate('ID',-114.08,43.80).add_coordinate('NV',-116.44,39.61).add_coordinate('MT',-109.42,47.13).add_coordinate('WY',-107.29,42.96)

            .add_coordinate('UT',-111.19,39.35).add_coordinate('AZ',-111.70,34.45).add_coordinate('HI',-105.25,28.72).add_coordinate('CO',-105.52,38.89)

            .add_coordinate('NM',-106.11,34.45).add_coordinate('ND',-100.22,47.53).add_coordinate('SD',-100.52,44.72).add_coordinate('NE',-99.64,41.65)

            .add_coordinate('KS',-98.53,38.43).add_coordinate('OK',-97.13,35.42).add_coordinate('TX',-98.16,31.03).add_coordinate('MN',-94.26,46.02)

            .add_coordinate('IA',-93.60,42.09).add_coordinate('MO',-92.57,38.48).add_coordinate('AR',-92.43,34.69).add_coordinate('LA',-92.49,31.22)

            .add_coordinate('WI',-89.55,44.25).add_coordinate('MI',-84.62,43.98).add_coordinate('IL',-89.11,40.20).add_coordinate('IN',-86.17,40.08)

            .add_coordinate('OH',-82.71,40.31).add_coordinate('KY',-84.92,37.44).add_coordinate('TN',-86.32,35.78).add_coordinate('MS',-89.63,32.66)

            .add_coordinate('AL',-86.68,32.53).add_coordinate('FL',-81.68,28.07).add_coordinate('GA',-83.22,32.59).add_coordinate('SC',-80.65,33.78)

            .add_coordinate('NC',-78.88,35.48).add_coordinate('VA',-78.24,37.48).add_coordinate('WV',-80.63,38.62).add_coordinate('PA',-77.57,40.78)

            .add_coordinate('NY',-75.22,43.06).add_coordinate('MD',-76.29,39.09).add_coordinate('DE',-75.55,39.09).add_coordinate('NJ',-74.47,40.03)

            .add_coordinate('VT',-72.70,44.13).add_coordinate('NH',-71.64,43.59).add_coordinate('MA',-72.09,42.33).add_coordinate('CT',-72.63,41.67)

            .add_coordinate('RI',-71.49,41.64).add_coordinate('ME',-69.06,45.16).add_coordinate('PR',-75.37,26.42).add_coordinate('DC',-77.04,38.90)

            .add("Density", list,symbol_size = 10,itemstyle_opts = opts.ItemStyleOpts(color="red"))

            .set_series_opts(label_opts=opts.LabelOpts(is_show=False),type='effectScatter')

            .set_global_opts(

            title_opts=opts.TitleOpts(title="美国人口密度图"),

            visualmap_opts=opts.VisualMapOpts(max_ = maxDensity,is_piecewise=True),

        )

            .render(outUrl)

    )

 

(2)结果如下:

2.美国人口前10的州柱状图

(1)代码如下

def drawTop10PopulationState(inUrl):

    #解决中文乱码

    plt.rcParams['font.sans-serif']=['SimHei']

    plt.rcParams['axes.unicode_minus'] = False

    #从文件中读取数据

    df = pd.read_csv(inUrl,names=['population','state'])

    population = df['population'].tolist()

    state = df['state'].tolist()

    #画图

    # 设置x,y轴标签

    plt.xlabel("人口数量")

    plt.ylabel("州名")

    plt.barh(state,population,facecolor='tan',height=0.5,edgecolor='r',alpha=0.6)

    plt.title("美国人口数量top10的州")

plt.show()

 

(2)结果如下:

3.美国各州基于面积的词云图

(1)代码如下

def drawAreaWordCloud(inUrl):

    #1从文件中读取数据

    df = pd.read_csv(inUrl,names=['population','area','density','state'])

    area = df['area'].tolist()

    state = df['state'].tolist()

    #2.将面积和州名建立成字典

    dict = {}

    for i in range(len(state)):

        dict[state[i]]=area[i]

    #3.生成词云

    wc = wordcloud.WordCloud(

        background_color='white',  # 背景颜色

        width=800,

        height=600,

        max_font_size=50,  # 字体大小

        min_font_size=10,

        mask=plt.imread('C:\\Users\\91541\\Pictures\\Saved Pictures\\map.jpg'),  # 背景图片

        max_words=1000 )

    wc.generate_from_frequencies(dict)

    wc.to_file('D:\\Area.jpg')  # 4.图片保存

(2)结果如下:

4.美国人口密度前10的州饼状图

(1)代码如下

def drawDensityPie(inUrl):

    #解决中文乱码

    plt.rcParams['font.sans-serif']=['SimHei']

    plt.rcParams['axes.unicode_minus'] = False

    #从文件中读取数据

    df = pd.read_csv(inUrl,names=['density','state'])

    density = df['density'].tolist()

    state = df['state'].tolist()

    #画图

    plt.axes(aspect=1)

    plt.pie(x=density,labels=state,autopct="%0f%%",shadow=True)

    plt.title("美国人口密度前10的州饼状图")

plt.show()

(2)结果如下:

☆ 《Flink原理与应用》课程空间