一、概述
1、需求分析
数据格式:日期 用户 搜索词 城市 平台 版本需求:1、筛选出符合查询条件(城市、平台、版本)的数据2、统计出每天搜索uv排名前3的搜索词3、按照每天的top3搜索词的uv搜索总次数,倒序排序4、将数据保存到hive表中###数据 keyword.txt
2018-10-1:leo:water:beijing:android:1.0
2018-10-1:leo1:water:beijing:android:1.0 2018-10-1:leo2:water:beijing:android:1.0 2018-10-1:jack:water:beijing:android:1.0 2018-10-1:jack1:water:beijing:android:1.0 2018-10-1:leo:seafood:beijing:android:1.0 2018-10-1:leo1:seafood:beijing:android:1.0 2018-10-1:leo2:seafood:beijing:android:1.0 2018-10-1:leo:food:beijing:android:1.0 2018-10-1:leo1:food:beijing:android:1.0 2018-10-1:leo2:meat:beijing:android:1.0 2018-10-2:leo:water:beijing:android:1.0 2018-10-2:leo1:water:beijing:android:1.0 2018-10-2:leo2:water:beijing:android:1.0 2018-10-2:jack:water:beijing:android:1.0 2018-10-2:leo1:seafood:beijing:android:1.0 2018-10-2:leo2:seafood:beijing:android:1.0 2018-10-2:leo3:seafood:beijing:android:1.0 2018-10-2:leo1:food:beijing:android:1.0 2018-10-2:leo2:food:beijing:android:1.0 2018-10-2:leo:meat:beijing:android:1.0####
1、如果文本案例使用的是txt编辑,将文本保存ANSI格式,否则在groupByKey的时候,第一行默认会出现一个空格,分组失败。
2、文本的最后禁止出现空行,否则在split的时候会报错,出现数组越界的错误;
2、思路
1、针对原始数据(HDFS文件),获取输入的RDD2、使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合查询条件的数据。 2.1 普通的做法:直接在fitler算子函数中,使用外部的查询条件(Map),但是,这样做的话,是不是查询条件Map, 会发送到每一个task上一份副本。(性能并不好) 2.2 优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量进行数据筛选。 3、将数据转换为“(日期_搜索词, 用户)”格式,然后呢,对它进行分组,然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的uv。最后,获得“(日期_搜索词, uv)”4、将得到的每天每个搜索词的uv,RDD,映射为元素类型为Row的RDD,将该RDD转换为DataFrame5、将DataFrame注册为临时表,使用Spark SQL的开窗函数,来统计每天的uv数量排名前3的搜索词,以及它的搜索uv,最后获取,是一个DataFrame6、将DataFrame转换为RDD,继续操作,按照每天日期来进行分组,并进行映射,计算出每天的top3搜索词的搜索uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串7、按照每天的top3搜索总uv,进行排序,倒序排序8、将排好序的数据,再次映射回来,变成“日期_搜索词_uv”的格式9、再次映射为DataFrame,并将数据保存到Hive中即可
二、java实现
package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.broadcast.Broadcast;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.hive.HiveContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import scala.Tuple2;import java.util.*;public class DailyTop3Keyword { @SuppressWarnings("deprecation") public static void main(String[] args) { SparkConf conf = new SparkConf(); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new HiveContext(jsc.sc()); // 伪造数据(这些数据可以来自mysql数据库) final HashMap> queryParaMap = new HashMap >(); queryParaMap.put("city", Arrays.asList("beijing")); queryParaMap.put("platform", Arrays.asList("android")); queryParaMap.put("version", Arrays.asList("1.0", "1.2", "2.0", "1.5")); // 将数据进行广播 final Broadcast >> queryParamMapBroadcast = jsc.broadcast(queryParaMap); // 针对HDFS文件中的日志,获取输入RDD JavaRDD rowRDD = jsc.textFile("hdfs://spark1:9000/spark-study/keyword.txt"); // filter算子进行过滤 JavaRDD filterRDD = rowRDD.filter(new Function () { private static final long serialVersionUID = 1L; @Override public Boolean call(String log) throws Exception { // 切割原始日志,获取城市、平台和版本 String[] logSplit = log.split(":"); String city = logSplit[3]; String platform = logSplit[4]; String version = logSplit[5]; // 与查询条件进行比对,任何一个条件,只要该条件设置了,且日志中的数据没有满足条件 // 则直接返回false,过滤掉该日志 // 否则,如果所有设置的条件,都有日志中的数据,则返回true,保留日志 HashMap > queryParamMap = queryParamMapBroadcast.value(); List cities = queryParamMap.get("city"); if (!cities.contains(city) && cities.size() > 0) { return false; } List platforms = queryParamMap.get("platform"); if (!platforms.contains(platform)) { return false; } List versions = queryParamMap.get("version"); if (!versions.contains(version)) { return false; } return true; } }); // 过滤出来的原始日志,映射为(日期_搜索词,用户)格式 JavaPairRDD dateKeyWordUserRDD = filterRDD.mapToPair(new PairFunction () { private static final long serialVersionUID = 1L; @Override public Tuple2 call(String log) throws Exception { String[] logSplit = log.split(":"); String date = logSplit[0]; String user = logSplit[1]; String keyword = logSplit[2]; return new Tuple2 (date + "_" + keyword, user); } }); // 进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重) JavaPairRDD > dateKeywordUsersRDD = dateKeyWordUserRDD.groupByKey(); List >> collect1 = dateKeywordUsersRDD.collect(); for (Tuple2 > tuple2 : collect1) { System.out.println("进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)" + tuple2._2); System.out.println(tuple2); } // 对每天每个搜索词的搜索用户 去重操作 获得前uv JavaPairRDD dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair (new PairFunction >, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 > dataKeywordUsers) throws Exception { String dateKeyword = dataKeywordUsers._1; Iterator users = dataKeywordUsers._2.iterator(); // 对用户去重 并统计去重后的数量 List distinctUsers = new ArrayList (); while (users.hasNext()) { String user = users.next(); if (!distinctUsers.contains(user)) { distinctUsers.add(user); } } // 获取uv long uv = distinctUsers.size(); // 日期_搜索词,用户个数 return new Tuple2 (dateKeyword, uv); } }); List > collect2 = dateKeywordUvRDD.collect(); for (Tuple2 stringLongTuple2 : collect2) { System.out.println("对每天每个搜索词的搜索用户 去重操作 获得前uv"); System.out.println(stringLongTuple2); } // 将每天每个搜索词的uv数据,转换成DataFrame JavaRDD dateKeywordUvRowRDD = dateKeywordUvRDD.map(new Function
, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(Tuple2 dateKeywordUv) throws Exception { String date = dateKeywordUv._1.split("_")[0]; String keyword = dateKeywordUv._1.split("_")[1]; long uv = dateKeywordUv._2; return RowFactory.create(date, keyword, uv); } }); ArrayList fields = new ArrayList (); fields.add(DataTypes.createStructField("date", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("keyword", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("uv", DataTypes.LongType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType); dateKeywordUvDF.registerTempTable("sales"); // 使用开窗函数,统计每天搜索uv排名前三的热点搜索词 // 日期 搜索词 人数个数 前三名 final DataFrame dailyTop3KeyWordDF = sqlContext.sql("select date,keyword,uv from (select date, keyword, uv, row_number() over (partition by date order by uv DESC ) rank from sales ) tmp_sales where rank <=3"); // 将DataFrame转换为RDD, 映射, JavaRDD dailyTop3KeyWordRDD = dailyTop3KeyWordDF.javaRDD(); JavaPairRDD
dailyTop3KeywordRDD = dailyTop3KeyWordRDD.mapToPair(new PairFunction () { private static final long serialVersionUID = 1L; @Override public Tuple2
call(Row row) throws Exception { String date = String.valueOf(row.get(0)); String keyword = String.valueOf(row.get(1)); String uv = String.valueOf(row.get(2)); // 映射为 日期 搜索词_总个数 return new Tuple2 (date, keyword + "_" + uv); } }); List > collect = dailyTop3KeywordRDD.collect(); for (Tuple2 stringStringTuple2 : collect) { System.out.println("开窗函数操作"); System.out.println(stringStringTuple2); } // 根据 日期分组 JavaPairRDD > top3DateKeywordsRDD = dailyTop3KeywordRDD.groupByKey(); // 进行映射 JavaPairRDD uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(new PairFunction >, Long, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 > tuple) throws Exception { String date = tuple._1; // 搜索词_总个数 集合 Iterator KeyWordUviterator = tuple._2.iterator(); long totalUv = 0L; String dateKeyword = date; while (KeyWordUviterator.hasNext()) { // 搜索词_个数 String keywoarUv = KeyWordUviterator.next(); Long uv = Long.valueOf(keywoarUv.split("_")[1]); totalUv += uv; dateKeyword = dateKeyword + "," + keywoarUv; } return new Tuple2 (totalUv, dateKeyword); } }); JavaPairRDD sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false); List > rows = sortedUvDateKeywordsRDD.collect(); for (Tuple2 row : rows) { System.out.println(row._2 + " " + row._1); } // 映射 JavaRDD resultRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction
, Row>() { private static final long serialVersionUID = 1L; @Override public Iterable call(Tuple2
tuple) throws Exception { String dateKeywords = tuple._2; String[] dateKeywordsSplit = dateKeywords.split(","); String date = dateKeywordsSplit[0]; ArrayList rows = new ArrayList
(); rows.add(RowFactory.create(date, dateKeywordsSplit[1].split("_")[0], Long.valueOf(dateKeywordsSplit[1].split("_")[1]))); rows.add(RowFactory.create(date, dateKeywordsSplit[2].split("_")[0], Long.valueOf(dateKeywordsSplit[2].split("_")[1]))); rows.add(RowFactory.create(date, dateKeywordsSplit[3].split("_")[0], Long.valueOf(dateKeywordsSplit[3].split("_")[1]))); return rows; } }); // 将最终的数据,转换为DataFrame,并保存到Hive表中 DataFrame finalDF = sqlContext.createDataFrame(resultRDD, structType);// List
rows1 = finalDF.javaRDD().collect();// for (Row row : rows1) {// System.out.println(row);// } finalDF.saveAsTable("daily_top3_keyword_uv"); jsc.close(); }}