rx java - RxAndroid download multiple files, max 3 concurrent thread -
i have api download single mp3 file server,which consumed using rxjava bellow.
observable<responsebody> observable = audioservice.getfile(filenamewithextension); observable.subscribeon(schedulers.newthread()) .observeon(schedulers.newthread()) .subscribe(somecallbackclass<responsebody>);
this downloads single file , callback saves file on disk. want download list of files save each file on disk , wait till download completes , @ max 3 calls should executing in parallel. how rxandroid , tried flatmap not able understand fully.
edit new code
list<observable<response<responsebody>>> audiofiles = new arraylist<>(); (string filenamewithextension : filenameswithextension) { observable<response<responsebody>> observable = restfactory.getaudioservice().getfile(filenamewithextension); audiofiles.add(observable); } observable.from(audiofiles).flatmap(audiofile -> observable.fromcallable(() -> { audiofile.subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()) .toblocking() .subscribe(new callbackwitherrorhandling<>(downloader.this)); return 0; }).subscribeon(schedulers.io()), max_concurrent) .observeon(androidschedulers.mainthread()) .subscribe(new subscriber<integer>() { @override public void oncompleted() { gotomainactivity(); } @override public void onerror(throwable e) { log.e(tag, "something went wrong , " + thread.currentthread().getname()); log.e(tag, "something went wrong , " + e.tostring()); showtoast(r.string.something_went_wrong); gotomainactivity(); } @override public void onnext(integer integer) { } });
this working fine when network down or slow internet connection getting
java.lang.runtimeexception: can't create handler inside thread has not called looper.prepare()
i unable understand thread need observeon() android main thread.
you can achieve flatmap, limiting concurrency, need inner observable running on background scheduler file transfer:
filenames .flatmap(name -> { return observable.fromcallable(() -> { // put blocking download code here, save data return name; // return need down below }) .subscribeon(schedulers.io()); }, 3) .observeon(androidschedulers.mainthread()) .subscribe(completedfile -> { }, error -> { }, () -> { /* completed.*/ });
edit:
since using observable api network download, don't need block:
observable.from(audiofiles) .flatmap(audiofile -> audiofile.subscribeon(schedulers.io()), // <-- apply transforms here max_concurrent) .observeon(androidschedulers.mainthread()) .subscribe(completedfile -> { }, error -> { }, () -> { /* completed.*/ })
it unclear though callbackwitherrorhandling
.
Comments
Post a Comment