scala - Spark SQL DataFrame transformation involving partitioning and lagging -


i want transform spark sql dataframe this:

animal value ------------ cat        8 cat        5 cat        6 dog        2 dog        4 dog        3 rat        7 rat        4  rat        9 

into dataframe this:

animal value   previous-value ----------------------------- cat        8                0 cat        5                8 cat        6                5 dog        2                0 dog        4                2 dog        3                4    rat        7                0 rat        4                7 rat        9                4  

i sort of want partition animal, , then, each animal, previous-value lags 1 row behind value (with default value of 0), , put partitions again.

this can accomplished using window function.

import org.apache.spark.sql.expressions.window import sqlcontext.implicits._  val df = sc.parallelize(seq(("cat", 8, "01:00"),("cat", 5, "02:00"),("cat", 6, "03:00"),("dog", 2, "02:00"),("dog", 4, "04:00"),("dog", 3, "06:00"),("rat", 7, "01:00"),("rat", 4, "03:00"),("rat", 9, "05:00"))).todf("animal", "value", "time")  df.show +------+-----+-----+ |animal|value| time| +------+-----+-----+ |   cat|    8|01:00| |   cat|    5|02:00| |   cat|    6|03:00| |   dog|    2|02:00| |   dog|    4|04:00| |   dog|    3|06:00| |   rat|    7|01:00| |   rat|    4|03:00| |   rat|    9|05:00| +------+-----+-----+ 

i've added "time" field illustrate orderby.

val w1 = window.partitionby($"animal").orderby($"time")  val previous_value = lag($"value", 1).over(w1) val df1 = df.withcolumn("previous", previous_value)  df1.show +------+-----+-----+--------+                                                    |animal|value| time|previous| +------+-----+-----+--------+ |   dog|    2|02:00|    null| |   dog|    4|04:00|       2| |   dog|    3|06:00|       4| |   cat|    8|01:00|    null| |   cat|    5|02:00|       8| |   cat|    6|03:00|       5| |   rat|    7|01:00|    null| |   rat|    4|03:00|       7| |   rat|    9|05:00|       4| +------+-----+-----+--------+ 

if want replace nulls 0:

val df2 = df1.na.fill(0) df2.show +------+-----+-----+--------+ |animal|value| time|previous| +------+-----+-----+--------+ |   dog|    2|02:00|       0| |   dog|    4|04:00|       2| |   dog|    3|06:00|       4| |   cat|    8|01:00|       0| |   cat|    5|02:00|       8| |   cat|    6|03:00|       5| |   rat|    7|01:00|       0| |   rat|    4|03:00|       7| |   rat|    9|05:00|       4| +------+-----+-----+--------+ 

Comments

Popular posts from this blog

account - Script error login visual studio DefaultLogin_PCore.js -

xcode - CocoaPod Storyboard error: -