scala - List processing in DStream -
i have of list of words dstream. eg: list(car, speed, accident, speed, bad). want form bi grams list. have rdds facing issues dstreams. using foreachrdd function. below have -
am trying print contents of rdd after transformation.
def printrdd(rddstring: rdd[string]) ={ val z = rddstring.map( y => y.tostring.split(",").filter(_.nonempty). map( y => y.replaceall("""\w""", "").tolowercase) .filter(_.nonempty) .sliding(2).filter(_.size == 2).map{ case array(a, b) => ((a, b), 1) }) .flatmap(x => x) println(z) } val x = lines.map(plaintexttolemmas(_, stopwords)) val words = x.flatmap( y=> y.tostring.split(",")) words.foreachrdd( rdd => printrdd(rdd))
is there way show contents after transformation function printrdd. if use println(z) inside print definition, returns mappartitionsrdd[18] @ flatmap. using kafka spark streaming read inputs, words value on console. think words not changed after invoking function printrdd.
you can these operating on dstream
, not inside foreachrdd
, call print
on dstream
:
lines .map(plaintexttolemmas(_, stopwords)) .flatmap(y => y.tostring.split(",")) .map(y => y.tostring.split(",").filter(_.nonempty)) .map(y => y.replaceall("""\w""", "").tolowercase) .filter(_.nonempty) .sliding(2) .filter(_.size == 2) .flatmap { case array(a, b) => ((a, b), 1) } .print()
this should print out content of dstream
console on driver.
an important thing note although you're operating on dstream
, it's methods "drill into" underlying rdd
@ given batch time , expose actual type inside rdd
, shouldn't need use foreachrdd
reach actual data inside.
Comments
Post a Comment