简介 本篇主要记录 pyspark  操作 dataframe  的常见基本操作,以 hive  作为数据源进行示例。
 
Hive 数据源相关信息 cd 查看 hive 数据源信息
1 2 spark.sql("show databases;" ).show() 
 
Dataframe 常用操作 Dataframe 自定义 UDF 函数 通过 udf  ( User defined function ) 可以将自定义的业务逻辑函数作用于 dataframe 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from  pyspark.sql.functions import  udffrom  pyspark.sql.types import  StringType, BooleanType, IntegerTypedata = [(1 , 1.0 , None ), (1 , 2.0 ,5 ), (2 , 3.0 ,5 ), (2 , 5.0 ,5 ), (2 , 10.0 ,5 )] cols = ("id" , "v" , "f" ) df = spark.createDataFrame(data, cols)     +---+----+----+     | id|   v|   f|     +---+----+----+     |  1 | 1.0 |null|     |  1 | 2.0 |   5 |     |  2 | 3.0 |   5 |     |  2 | 5.0 |   5 |     |  2 |10.0 |   5 |     +---+----+----+ def  is_even (val) :    try :         return  (val%2 ) == 0      except  Exception as  e:         return  False  udf_is_even = udf(is_even, returnType= BooleanType()) df.withColumn("is_v_even" , udf_is_even(df["v" ])).show() 	+---+----+----+---------+ 	| id|   v|   f|is_v_even| 	+---+----+----+---------+ 	|  1 | 1.0 |null|    false| 	|  1 | 2.0 |   5 |     true| 	|  2 | 3.0 |   5 |    false| 	|  2 | 5.0 |   5 |    false| 	|  2 |10.0 |   5 |     true| 	+---+----+----+---------+ 
 
Dataframe column 操作 重命名列 withColumnRenamed 
withColumnRenamed(existing, new)  : Returns a new DataFrame by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.     Parameters:      - existing – string, name of the existing column to rename.     - new – string, new name of the column.
 
1 2 df = df.withColumnRenamed("v" , "val" ) 
新增列 withColumn 
withColumn(colName, col)  : Returns a new DataFrame by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.     Parameters:      - colName – string, name of the new column.     - col – a Column expression for the new column.
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 df.show() 	+---+----+----+ 	| id|   v|   f| 	+---+----+----+ 	|  1 | 1.0 |null|     	|  2 |10.0 |   5 | 	+---+----+----+ df.withColumn("is_v_even" , udf_is_even(df["v" ])).show() 	+---+----+----+---------+ 	| id|   v|   f|is_v_even| 	+---+----+----+---------+ 	|  1 | 1.0 |null|    false| 	|  2 |10.0 |   5 |     true| 	+---+----+----+---------+ from  pyspark.sql.functions import  litdf.withColumn("constants" , lit("Hello" )).show() 	+---+----+----+---------+ 	| id|   v|   f|constants| 	+---+----+----+---------+ 	|  1 | 1.0 |null|    Hello| 	|  2 |10.0 |   5 |    Hello| 	+---+----+----+---------+ from  pyspark.sql.functions import  concat_wsdf.withColumn("v+f" , df["v" ] + df["f" ]).show() 	+---+----+----+----+ 	| id|   v|   f| v+f| 	+---+----+----+----+ 	|  1 | 1.0 |null|null|	 	|  2 |10.0 |   5 |15.0 | 	+---+----+----+----+ df.withColumn("Concat_v_f" , concat_ws("-" , *["v" ,"f" , "id" ])).show() 	+---+----+----+----------+ 	| id|   v|   f|Concat_v_f| 	+---+----+----+----------+ 	|  1 | 1.0 |null|     1.0 -1 |	 	|  2 |10.0 |   5 |  10.0 -5 -2 | 	+---+----+----+----------+ 
 
替换列 新增列时,如果使用的列名是已存在的列,则会替换该列。
FAQ 1. 两个 df union 后出现数据错乱。  DataFrame 的 union 操作,只会按照 cols 的顺序进行两个 DataFrame 的合并,忽略 DataFrame 的列名,如果两个 DataFrame 的每个列的顺序不是完全一致的,则会出现数据的错乱。 如希望按列匹配合并,应使用 unionByName  ( pyspark 2.3 引入)。
1 2 3 4 5 6 7 8 9 10 11 df_1 = spark.createDataFrame([("Dan" , 28 )],("name" , "age" )) df_2 = spark.createDataFrame([(18 , "John" )],("age" , "name" )) df_1.union(df_2).show() 	+----+----+ 	|name| age| 	+----+----+ 	| Dan|  28 | 	|  18 |John| 	+----+----+ 
 
参考资料