multithreading - Akka Stream Graph parallelisation -
i've created graph
whcih contains balance
. balance
distributes load on 5 flows
. expected happen every instance of flow
run on seperate thread
. however, not happens. when i'm printing thread
name notice flows
being executed on same thread
.
the code i'm using is:
runnablegraph.fromgraph(graphdsl.create() { implicit builder: graphdsl.builder[notused] => val in = source(1 10) val out = sink.ignore val bal = builder.add(balance[int](5)) val merge = builder.add(merge[int](5)) val f1, f2, f3, f4, f5 = flow[int].map(x => { println(thread.currentthread()) x }).async in ~> bal ~> f1 ~> merge ~> out bal ~> f2 ~> merge bal ~> f3 ~> merge bal ~> f4 ~> merge bal ~> f5 ~> merge closedshape })
this outputs:
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
my expectation output along lines of:
thread[stream_poc-akka.actor.default-dispatcher-1,5,main]
thread[stream_poc-akka.actor.default-dispatcher-2,5,main]
thread[stream_poc-akka.actor.default-dispatcher-3,5,main]
thread[stream_poc-akka.actor.default-dispatcher-4,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
thread[stream_poc-akka.actor.default-dispatcher-1,5,main]
thread[stream_poc-akka.actor.default-dispatcher-2,5,main]
thread[stream_poc-akka.actor.default-dispatcher-3,5,main]
thread[stream_poc-akka.actor.default-dispatcher-4,5,main]
thread[stream_poc-akka.actor.default-dispatcher-5,5,main]
how can change code sample flows
being executed in parallel?
the async directive not guarantee stages executed in separated thread. long stages not overlap in time, might run on same thread.
for specific case, executed steps might following:
- merge requests element on 1st inlet
- balance serves element through 1st flow
- merge requests element on 2nd inlet
- balance serves element through 2nd flow
- etc.
now if change balance follows
val bal = builder.add(balance[int](5, waitforalldownstreams = true))
you forcing 5 threads spawned, steps be
- merge requests element on 1st inlet
- merge requests element on 2nd inlet
- merge requests element on 3rd inlet
- merge requests element on 4th inlet
- merge requests element on 5th inlet
- balance starts serving elements through flows
Comments
Post a Comment