scala - Spark SQL DataFrame transformation involving partitioning and lagging -
i want transform spark sql dataframe this:
animal value ------------ cat 8 cat 5 cat 6 dog 2 dog 4 dog 3 rat 7 rat 4 rat 9
into dataframe this:
animal value previous-value ----------------------------- cat 8 0 cat 5 8 cat 6 5 dog 2 0 dog 4 2 dog 3 4 rat 7 0 rat 4 7 rat 9 4
i sort of want partition animal
, , then, each animal
, previous-value
lags 1 row behind value
(with default value of 0
), , put partitions again.
this can accomplished using window function.
import org.apache.spark.sql.expressions.window import sqlcontext.implicits._ val df = sc.parallelize(seq(("cat", 8, "01:00"),("cat", 5, "02:00"),("cat", 6, "03:00"),("dog", 2, "02:00"),("dog", 4, "04:00"),("dog", 3, "06:00"),("rat", 7, "01:00"),("rat", 4, "03:00"),("rat", 9, "05:00"))).todf("animal", "value", "time") df.show +------+-----+-----+ |animal|value| time| +------+-----+-----+ | cat| 8|01:00| | cat| 5|02:00| | cat| 6|03:00| | dog| 2|02:00| | dog| 4|04:00| | dog| 3|06:00| | rat| 7|01:00| | rat| 4|03:00| | rat| 9|05:00| +------+-----+-----+
i've added "time" field illustrate orderby.
val w1 = window.partitionby($"animal").orderby($"time") val previous_value = lag($"value", 1).over(w1) val df1 = df.withcolumn("previous", previous_value) df1.show +------+-----+-----+--------+ |animal|value| time|previous| +------+-----+-----+--------+ | dog| 2|02:00| null| | dog| 4|04:00| 2| | dog| 3|06:00| 4| | cat| 8|01:00| null| | cat| 5|02:00| 8| | cat| 6|03:00| 5| | rat| 7|01:00| null| | rat| 4|03:00| 7| | rat| 9|05:00| 4| +------+-----+-----+--------+
if want replace nulls 0:
val df2 = df1.na.fill(0) df2.show +------+-----+-----+--------+ |animal|value| time|previous| +------+-----+-----+--------+ | dog| 2|02:00| 0| | dog| 4|04:00| 2| | dog| 3|06:00| 4| | cat| 8|01:00| 0| | cat| 5|02:00| 8| | cat| 6|03:00| 5| | rat| 7|01:00| 0| | rat| 4|03:00| 7| | rat| 9|05:00| 4| +------+-----+-----+--------+
Comments
Post a Comment