multithreading - Akka Stream Graph parallelisation -


i've created graph whcih contains balance. balance distributes load on 5 flows. expected happen every instance of flow run on seperate thread. however, not happens. when i'm printing thread name notice flows being executed on same thread.

the code i'm using is:

runnablegraph.fromgraph(graphdsl.create() { implicit builder: graphdsl.builder[notused] => val in = source(1 10)   val out = sink.ignore    val bal = builder.add(balance[int](5))   val merge = builder.add(merge[int](5))    val f1, f2, f3, f4, f5 = flow[int].map(x => {     println(thread.currentthread())     x   }).async    in ~> bal ~> f1 ~> merge ~> out   bal ~> f2 ~> merge   bal ~> f3 ~> merge   bal ~> f4 ~> merge   bal ~> f5 ~> merge    closedshape }) 

this outputs:

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

my expectation output along lines of:

thread[stream_poc-akka.actor.default-dispatcher-1,5,main]

thread[stream_poc-akka.actor.default-dispatcher-2,5,main]

thread[stream_poc-akka.actor.default-dispatcher-3,5,main]

thread[stream_poc-akka.actor.default-dispatcher-4,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

thread[stream_poc-akka.actor.default-dispatcher-1,5,main]

thread[stream_poc-akka.actor.default-dispatcher-2,5,main]

thread[stream_poc-akka.actor.default-dispatcher-3,5,main]

thread[stream_poc-akka.actor.default-dispatcher-4,5,main]

thread[stream_poc-akka.actor.default-dispatcher-5,5,main]

how can change code sample flows being executed in parallel?

the async directive not guarantee stages executed in separated thread. long stages not overlap in time, might run on same thread.

for specific case, executed steps might following:

  • merge requests element on 1st inlet
  • balance serves element through 1st flow
  • merge requests element on 2nd inlet
  • balance serves element through 2nd flow
  • etc.

now if change balance follows

val bal = builder.add(balance[int](5, waitforalldownstreams = true)) 

you forcing 5 threads spawned, steps be

  • merge requests element on 1st inlet
  • merge requests element on 2nd inlet
  • merge requests element on 3rd inlet
  • merge requests element on 4th inlet
  • merge requests element on 5th inlet
  • balance starts serving elements through flows

Comments

Popular posts from this blog

account - Script error login visual studio DefaultLogin_PCore.js -

xcode - CocoaPod Storyboard error: -