Spark SQL 基础
Spark SQL 特点
1. 支持多种数据源: Hive,RDD,Parquet,JSON,JDBC 2. 多种性能优化技术: in-memory columnar storage, byte-code generation, cost model 动态评估等 3. 组件扩展性: 对于 SQL 的讲法解析器,分析器以及优化器,用户都可以自己开发,并且动态扩展
Spark SQL 开发步骤
- 创建 SQLContext /HiveContext(官方推荐) 对象
SparkConf sparkConf = new SparkConf() .setAppName("...") .setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc);
使用 Json 文件创建 DataFrame DataFrameCreate.java
SparkConf sparkConf = new SparkConf() .setAppName("DataFrameCreate") .setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); DataFrame df = sqlContext.read().json("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/students.json"); df.show();
DataFrame 常用操作
打印 DataFrame 中所有的数据
df.show();
打印 DataFrame 的元数据 (Schema)
df.printSchema();
查询某列所有数据
df.select("name").show();
查询某几列所有数据,并对列进行计算
df.select(df.col("name"),df.col("age").plus(1)).show();
根据某一列的值进行过滤
df.filter(df.col("age").gt(18)).show();
根据某一列进行分组,然后进行聚合
df.groupBy(df.col("age")).count().show();
RDD 转 DataFrame
为什么要将 RDD 转 DataFrame?
因为这样的话,我们可以直接针对 HDFS 等任何可以构建为 RDD 的数据,使用 Spark SQL 进行 SQL 查询,这个功能很强大
Spark SQL 支持 2 种方式来将 RDD 转为 DataFrame
1. 使用反射来推断包含了特定数据类型的 RDD 元数据.这种基于反射的方式,代码比较简洁,当你已经知道你的 RDD 的元素时,是一种不错的方式 2. 通过编程接口来创建 DataFrame,你可以在程序运行时动态构建一份元数据,然后将其应用到已经存在的 RDD 上,代码比较冗长, 但如果在编写程序时,还不知道 RDD 的元数据,只有在程序运行时,才能动态得知其元数据,只能通过动态构建元数据的方式
使用反射的方式将 RDD 转换为 Dataframe RDD2DataFrameReflection.java
JavaRDD<String> lines = jsc.textFile("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/students.json"); JavaRDD<Student> students = lines.map(new Function<String, Student>() { public Student call(String line) throws Exception { String[] lineSplited = line.split(","); Student stu = new Student(); stu.setId(Integer.valueOf(lineSplited[0])); stu.setName(lineSplited[1]); stu.setAge(Integer.valueOf(lineSplited[2])); return stu; } }); //使用反射方式将 RDD 转换为 DataFrame //将 Student.Class 传入进去,其实就是用反射的方式来创建 DataFrame //因为 Student.class 本身就是反射的一个应用 //然后底层还得通过对 Student Class 进行反射,来获取其中的 field DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //拿到 DataFrame 后,将其注册为一个临时表,然后针对其中的数据进行 SQL 语句 studentDF.registerTempTable("students"); //针对 students 临时表执行语句,查询年龄小于等于 18 岁的学生,就是 teenager DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18"); //将查询出的 DataFrame,再次转换为 RDD JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD(); //将 RDD 中的数据,进行映射,映射为 student JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() { public Student call(Row row) throws Exception { Student student = new Student(); student.setAge(row.getInt(0)); student.setName(row.getString(2)); student.setId(row.getInt(1)); return student; } }); //将数据 collect,打印出来 List<Student> studentList = teenagerStudentRDD.collect(); for (Student student : studentList) { System.out.println(student.toString()); }
出现的问题:
1.Java Bean: Student.java 的序列化问题(不序列化会报错),public class Student implements Serializable
2.将 RDD 中的数据,进行映射,映射为 student 时,其 RDD 中 student 的属性顺序会乱(与文件中顺序不一致)
以编程方式动态指定元数据,将 RDD 转换为 DataFrame
//第一步,创建一个普通的 RDD,但是,必须将其转换为 RDD<Row>的这种格式 final JavaRDD<String> lines = jsc.textFile("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/students.json"); JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() { public Row call(String line) throws Exception { String[] lineSplited = line.split(","); return RowFactory.create(Integer.valueOf(lineSplited[0]), lineSplited[1], Integer.valueOf(lineSplited[2])); } }); //第二步:动态构造元数据 //比如说,id,name 等,field 的名称和类型,可能都是在程序运行过程中,动态从 mysql,db 里 //或者是配置文件中加载的,不固定 //所以特别适合用这种编程方式,来构造元数据 List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); //第三步,使用动态构造的元数据,将 RDD 转为 DataFrame DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType); studentDF.registerTempTable("students"); DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18"); List<Row> rows = teenagerDF.javaRDD().collect(); for (Row row : rows) { System.out.println(row); }
出现的问题:
1.报错:不能直接从 String 转换为 Integer 的一个类型转换错误,说明有个数据,给定义成了 String 类型,结果使用的时候 要用 Integer 类型来使用,错误报在 sql 相关的代码中.在 sql 中,用到 age<=18 语法,所以强行将 age 转换为 Integer 来使用,但之前有些步骤将 age 定义了 String
通用的 load 和 save 操作 GenericLoadSave.java
DataFrame usersDF = sqlContext.read().load("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet"); usersDF.printSchema(); usersDF.show(); usersDF.select("name","favorite_color").write().save("/home/sotowang/Desktop/nameAndColors.parquet");
手动指定数据源类型 ManuallySpecifyOptions.java
默认为 parquet
DataFrame usersDF = sqlContext.read() .format("parquet") .load("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet"); usersDF.select("name","favorite_color") .write() .format("json") .save("/home/sotowang/Desktop/nameAndColors");
saveMode SaveModeTest.java
SaveMode.ErrorIfExists 文件存在会报错
usersDF.save("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet", SaveMode.ErrorIfExists);
SaveMode.Append 存在追加数据
usersDF.save("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet", SaveMode.Append);
SaveMode.Overwrite 覆盖
SaveMode.ignore 忽略
数据源 Parquet 之使用编程方式加载数据 ParquetLoadData.java
- Parquet 是面向分析型业务的列式存储格式
列式存储与行式存储有哪些优势?
1. 可以路过不符合条件的数据,只需要读取需要的数据,降低 IO 数据量 2. 压缩编码可以降低磁盘存储空间,由于同一列的数据类型是一样的,可以使用更高效的压缩编码(如 Run Length Encoding 和 Delta Encoding) 进一步节约存储空间 3. 只读取需要的列,支持向量运算,能够获取更好的扫描性能
DataFrame usersDF = sqlContext.read().parquet("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet"); //将 DataFrame 注册为临时表,使用 SQL 查询需要的数据 usersDF.registerTempTable("users"); DataFrame userNameDf = sqlContext.sql("select name from users "); //对查询出的 DataFrame 进行 transformation 操作,处理数据,然后打印 List<String> userNames = userNameDf.javaRDD().map(new Function<Row, String>() { public String call(Row row) throws Exception { return "Name: " + row.getString(0); } }).collect(); for (String userName : userNames) { System.out.println(userName); }
数据源 Parquet 之自动分区推断
用户也许不希望 Spark SQL 自动推断分区列的数据类型。此时只要设置一个配置即可,
spark.sql.sources.partitionColumnTypeInference.enabled
,默认为 true,即自动推断分区列的类型,设置为 false,即不会自动推断类型。 禁止自动推断分区列的类型时,所有分区列的类型,就统一默认都是 String。
数据源 Parquet 之合并元数据(默认关闭) ParquetMergeSchema.java
案例:合并学生的基本信息和成绩信息的源数据
//创建一个 DataFrame,作为学生的基本信息,并写入一个 parquet 文件中 List<Tuple2<String, Integer>> studentsWithNameAge = Arrays.asList(new Tuple2<String, Integer>("leo", 23), new Tuple2<String, Integer>("Jack", 25)); JavaRDD<Row> studentWithNameAgeRDD = jsc.parallelize(studentsWithNameAge).map(new Function<Tuple2<String, Integer>, Row>() { public Row call(Tuple2<String, Integer> student) throws Exception { return RowFactory.create(student._1, student._2); } }); List<StructField> structFields_age = new ArrayList<StructField>(); structFields_age.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields_age.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType_age = DataTypes.createStructType(structFields_age); //创建一个 DataFrame,作为学生的基本信息,并写入一个 parquet 文件 DataFrame studentsWithNameAgeDF = sqlContext.createDataFrame(studentWithNameAgeRDD, structType_age); studentsWithNameAgeDF.save("/home/sotowang/Desktop/students", SaveMode.Append); //创建第二个 DataFrame,作为学生的成绩信息,并写入一个 parquet 文件中 List<Tuple2<String, String>> studentsWithNameGrade = Arrays.asList(new Tuple2<String, String>("marry", "A"), new Tuple2<String, String>("tom", "B")); JavaRDD<Row> studentsWithNameGradeRDD = jsc.parallelize(studentsWithNameGrade).map(new Function<Tuple2<String, String>, Row>() { public Row call(Tuple2<String, String> student) throws Exception { return RowFactory.create(student._1, student._2); } }); List<StructField> structFields_grade = new ArrayList<StructField>(); structFields_grade.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields_grade.add(DataTypes.createStructField("grade", DataTypes.StringType, true)); StructType structType_grade = DataTypes.createStructType(structFields_grade); //创建一个 DataFrame,作为学生的基本信息,并写入一个 parquet 文件 DataFrame studentsWithNameGradeDF = sqlContext.createDataFrame(studentsWithNameGradeRDD, structType_grade); studentsWithNameGradeDF.save("/home/sotowang/Desktop/students", SaveMode.Append); //第一个 DataFrame 和第二个 DataFrame 的元数据不一样,一个是包含了 name 和 age 两列,一个是包含了 name 和 grade 两列 //这里期望读出来的表数据,自动合并含两个文件的元数据,出现三个列,name age grade //有 mergeSchema 的方式读取 students 表中的数据,进行元数据的合并 DataFrame studentsDF = sqlContext.read().option("mergeSchema", "true").parquet("/home/sotowang/Desktop/students"); studentsDF.printSchema(); studentsDF.show();
结果
+-----+----+-----+ | name| age|grade| +-----+----+-----+ | leo| 23| null| | Jack| 25| null| |marry|null| A| | tom|null| B| +-----+----+-----+
Json 数据源 JSONDataSource.java
Spark SQL 可以自动推断 JSON 文件的元数据,并且加载其数据,创建一个 DataFrame,可以使用 SQLContext.read.json() 方法,针对一个元素类型为 String 的 RDD,或者是一个 JSON 文件
注意:这里使用的 JSON 文件与传统意义上的 JSON 文件是不一样的,每行都必须也只能包含一个单独的,包含的有效的 JSON 对象,不能让一个 JSON 对象分散在钓竿,否则会报错
案例: 查询成绩为 80 分以上的学生的基本信息与成绩信息
注:sqlContext.read().json(studentInfoJSONsRDD) ==> 该 API 可以接受一个 JavaRDD 直接转为 DataFrame,与前面所讲的反射不一样
注: 默认 DataFrame 中将数字类型转为 Long 而不是 Int,要将 Long 型转为 Integer 型需要 Integer.valueOf(String.valueOf(row.getLong(1)))
Hive 数据源 (企业常用) HiveDataSource.java
操作 Hive 使用 HiveContext 而为是 SQLContext.HiveContext 继承自 SQLContext,但是增加了在 Hive 元数据库中查找表,以及用 HiveQL 语法编写 SQL 功能,除了 sql() 方法,HiveContext 还提供了 hql() 方法,从而 Hive 语法来编译. Hive 中查询出来的数据是一个 Row 数组
将 hive-site.xml 拷贝到 spark/conf 目录下,将 mysql connector 拷贝到 spark/lib 目录下
- Spark SQL 允许将数据保存到 Hive 表中,调用 DataFrame 的 saveAsTable 命令,即可将 DaraFrame 中的数据保存到 Hive 表中.与 registerTempTable 不同, saveAsTable 是会将 DataFrame 中的数据物化到 Hive 表中的,而且还会在 Hive 元数据库中创建表的元数据
- 默认情况下,saveAsTable 会创建一张 Hive Managed Table,也就是说,数据的位置都是由元数据中的信息控制的.当 Managed Table 被删除时,表中的数据也传动一并被 物理删除
- regiserTempTable 只是注册一个临时的表,只要 Spark Application 重启或停止了,表就没了,而 saveAsTable 是物化的表,表会一直存在
- 调用 HiveContext.table() 方法,还可以直接针对 Hive 中的表,创建一个 DataFrame
案例: 查询分数大于 80 分的学生信息
- 创建 HiveContext,注意:它接收的是 sparkContext 为参数而不是 JavaSparkContext
HiveContext hiveContext = new HiveContext(jsc.sc());
- 第一个功能:使用 HiveContext 的 sql()/hql() 方法,可以执行 Hive 中能执行的 HiveQL 语句
//第一个功能:使用 HiveContext 的 sql()/hql() 方法,可以执行 Hive 中能执行的 HiveQL 语句 //判断是否存在 student_infos,若存在则删除 hiveContext.sql("DROP TABLE IF EXISTS student_infos"); //如果不存在,则创建该表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos(name STRING, age INT ) row format delimited fields terminated by ','"); //将学生基本信息数据导入 student_infos 表 hiveContext.sql("LOAD DATA " + " LOCAL INPATH '/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/student_infos.txt' " + " INTO TABLE student_infos "); //用同样的方式给 student_scores 导入数据 hiveContext.sql("DROP TABLE IF EXISTS student_scores "); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores(name STRING, score INT ) row format delimited fields terminated by ','"); hiveContext.sql("LOAD DATA " + " LOCAL INPATH '/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/student_scores.txt' " + " INTO TABLE student_scores ");
- 第二个功能,执行 sql 还可以返回 DataFrame,用于查询
//第二个功能,执行 sql 还可以返回 DataFrame,用于查询 //执行 sql 查询,关联两张表,查询成绩大于 80 分的学生 DataFrame goodStudentDF = hiveContext.sql(" SELECT si.name name, si.age age, ss.score score " + " FROM student_infos si " + " JOIN student_scores ss ON si.name =ss.name " + " WHERE ss.score >= 80 ");
- 第三个功能,可以将 DataFrame 中的数据,理论上来说,DataFrame 对应的 RDD 元素是 Row 即可将 DataFrame 中的数据保存到 hive 表中
//第三个功能,可以将 DataFrame 中的数据,理论上来说,DataFrame 对应的 RDD 元素是 Row 即可将 DataFrame 中的数据保存到 hive 表中 //将 DataFrame 中的数据保存到 good_student_infos hiveContext.sql("DROP TABLE IF EXISTS good_student_infos "); goodStudentDF.saveAsTable("good_student_infos");
- 第四个功能:可以用 table() 方法针对 hive 表,直接创建,DataFrame
//第四个功能:可以用 table() 方法针对 hive 表,直接创建 DataFrame //然后针对 good_student_infos 表直接创建 DataFrame Row[] goodStudentsRows = hiveContext.table("good_student_infos").collect(); for (Row row : goodStudentsRows) { System.out.println(row); }
注 1:实际运行过程中出现报错如下:
MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one)
解决方法:将 hive-site.xml 放至 resources 目录下
注 2:导入 HDFS 后,表内没数据,因为文件分隔符没有指定为空格
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos(name STRING, age INT ) row format delimited fields terminated by ','");
JDBC 数据源 JDBCDataSource.java
案例:查询分数大于 80 分的学生信息
- mysql 创建数据库
mysql> create database testdb;
mysql> create table student_infos(name varchar(20),age integer);
mysql> create table student_scores(name varchar(20),score integer);
mysql> create table good_student_infos(name varchar(20),age integer,score integer);
mysql> insert into student_infos values('leo',18),('marry',17),('jack',19);
mysql> insert into student_scores values('leo',88),('marry',99),('jack',60);
- 读取 mysql 表
Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc: mysql://sotowang-pc:3306/testdb"); options.put("user", "root"); options.put("password", "123456"); options.put("dbtable", "student_infos"); DataFrame studentInfoDF = sqlContext.read().format("jdbc").options(options).load();
- 将 DataFrame 中的数据保存到 Mysql 数据表中
//将 DataFrame 中的数据保存到 Mysql 数据表中 studentDF.javaRDD().foreach(new VoidFunction<Row>() { public void call(Row row) throws Exception { String sql = " insert into good_student_infos values('" + row.getString(0) + "'," + Integer.valueOf(String.valueOf(row.get(1))) + "," + Integer.valueOf(String.valueOf(row.get(2))) + ")"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null; Statement statement = null; try { conn = DriverManager.getConnection( "jdbc: mysql://sotowang-pc:3306/testdb", "root", "123456" ); statement = conn.createStatement(); statement.executeUpdate(sql); }catch (Exception e){ e.printStackTrace(); }finally { if (statement != null) { statement.close(); } if (conn != null) { conn.close(); } } } });
Spark SQL 高级内置函数
案例:根据每天的用户访问的购买日志统计每日的 uv 和销售额 (uv 指:对用户进行去重以后的访问总数)
内置函数:countDistinct() DailyUV1.java
聚合函数用法
首先对 DataFrame 调用 groupBy() 方法,对某一列进行分组,然后调用 agg() 方法,对数为内置函数,对见其源码
- 注:能过阅读 Spark agg 方法源码,得知需要手动导入 api:__ 才能使用 agg(countDistinct("userid")) 方法
import static org.apache.spark.sql.functions.countDistinct; JavaRDD<Row> userAccessLogDistinctedRowRDD = userAccessLogRowDF.groupBy("date") .agg(countDistinct("userid")) //(date,count(userid)) .javaRDD();
内置函数:sum() DailySale.java
开窗函数以及 top3 销售额统计案例
最常用的函数: row_number() 实现分组取 topN 的逻辑 RowNumberWindowFunction.java
- 创建销售额表,sales 表
hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales ( " + " product STRING, " + " category STRING, " + "revenue BIGINT ) "); hiveContext.sql("load data " + " local inpath '/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/sales.txt' " + " into table sales "); ```* 使用 row_number() 开窗函数 row_number() 作用就是给你一份每个分组的数据 按照其排序打上一个分组内的行号 比如:有一个分组 date=20181001,里面有 3 条数据,1122,1121,1124, 那么对这个分组的每一行使用 row_num() 开窗函数以后,三等依次会获得一个组内行号行号从 1 开始递增, 比如 1122 1,1121 2,1124 3 ```java DataFrame top3SalesDF = hiveContext.sql("" + "select product,category,revenue " + " from ( " + " select " + " product," + "category," + "revenue," + //row_number() 语法说明: //首先,在 select 查询时,使用 row_number() 函数 //其次,row_number() 后面跟上 over 关键字 //然后,括号中,是 partition by ,根据哪个字段进行分组 //其次,order by 起行组内排序 //然后,row_number() 就可以给组内行一个行号 "row_number() over (partition by category order by revenue desc ) rank " + "from sales " + " ) tmp_sales " + "where rank <=3 "); //将每组前三的数据保存到一个表中 hiveContext.sql("drop table if exists top3_sales "); top3SalesDF.saveAsTable("top3_sales");
Spark SQL 与 Hive On Spark
Spark SQL:
1. Spark 自己研发出来的,针对各种数据源:Hive,Parquet,JDBC,RDD 等都可以执行查询,一套基于 Spark 计算引擎的查询引擎 2. 它是一个 Spark 项目,只不过是提供了针对 Hive 执行查询的功能而已 3. 适用于 Spark 技术栈的大数据应用类系统,舆情分析系统,风控系统,用户行为分析系统...
Hive On Spark:
1. 是一个 Hive 项目,指不通过 MapReduce 作为唯一的查询引擎,而是将 Spark 作为底层的查询引擎 2. Hive On Spark,只适用于 Spark,在可预见的未来,很有可能 Hive 默认的底层引擎就从 MapReduce 切换为 Spark 了 3. 适合将原有的 Hive 数据仓库以及数据统计分析替换为 Spark 引擎,作为全公司通用的大数据分析引擎
Spark SQL 与 Spark Core 合并 DailyTop3KeyWord.java
案例: 每日 top3 热点搜索词统计案例实战
数据格式:
日期 用户 搜索词 城市 平台 版本
需求:
1. 筛选出符合查询条件的数据 2. 统计出每天搜索 uv 排名前 3 的搜索词 3. 按照每天的 top3 搜索词搜索总次数,侄是序排序 4. 将数据保存到 hive 表中
思路分析:
1. 针对原始数据(HDFS 文件),获取输入的 RDD 2. 使用 filter 牌子,去针对输入 RDD 中的数据,进行数据过滤,过滤出符合查询条件的数据 2.1 普通的做法:直接在 filter 牌子函数中,使用外部的查询条件(Map),但这样做的话,查询条件 Map 会发送到每一个 task 一个副本 2.2 优化后的做法:将查询条件封装为 Broadcast 广播变量,在 filter 算子中,使用 Broadcast 广播变量 3. 将数据转换为"(日期_搜索词,用户) "的格式然后转换进行分级,然后再次进行映射,对每天每个搜索词的用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的 uv,最后获得"(日期_搜索词,uv)" 4. 将得到的每天每个搜索词的 uv,RDD,映射为元素类型为 Row 的 RDD,将该 RDD 转换为 DataFrame 5. 将 DataFrame 注册为临时表,使用 Spark SQL 的开窗函数,来统计每天的 uv 数量排名前 3 的搜索词,以及它的搜索 uv,最后获取,是一个 DataFrame 6. 将 DataFrame 转换为 RDD,继续操作按照每天日期来进行分级,并进行映射,计算出每天的 top3 搜索词的搜索 uv 的总数,然后 uv 总数为 key,将每天 top3 搜索词以及搜索次数拼接为一个字符串 7. 按照每天的 top3 搜索总 uv,再次映射回来,变成"日期_搜索词_uv"的格式 8. 再次映射为 DataFra,e,并将数据保存到 Hive 中
- 遇到的问题:
- 在生成 hive 表时显示,hdfs 文件已存在
Exception in thread "main" org.apache.spark.sql.AnalysisException: path hdfs://sotowang-pc:9000/user/hive/warehouse/daily_top3_keyword_uv already exists.;
解决方法:
hadoop fs -rm -r /user/hive/warehouse/daily_top3_keyword_uv
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

上一篇: Java HashMap 详细介绍
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论