Я пытаюсь понять RxJava. Я написал небольшой фрагмент кода, который будет загружать файлы списка на сервер firebase. Я написал два наблюдаемых: 1. ImageUploaderService.java: загрузить один файл и сообщить о статусе 2. AllImageUploaderService.java: создать новый загрузчик одного файла и передать ему один путь к файлу для загрузки. Соберите результаты всех загрузчиков и сообщите вызывающему с помощью ссылки загруженных файлов.
Моя проблема заключается в том, что даже если каждый экземпляр ImageUploaderService подписан в AllImageUploaderService, запускается только одно событие OnNext. Как я могу это исправить?
AllImageUploaderService.java
public class AllImageUploaderService extends Subscriber<String>{
private BehaviorSubject<String[]> uploadService;
private ArrayList<String> uploadedFiles;
private int expectedCount;
private int actualCount;
public Observable<String[]> uploadImages(final String[] fileNames, final StorageReference imagesRef,
final String[] chosenImage) {
expectedCount = fileNames.length;
uploadedFiles = new ArrayList<>(fileNames.length);
uploadService = BehaviorSubject.create();
Observable<String[]> observable = Observable.create(new Observable.OnSubscribe<String[]>(){
@Override
public void call(Subscriber<? super String[]> subscriber) {
for (int i = 0; i < fileNames.length; i++) {
ImageUploaderService imageUploaderService=new ImageUploaderService();
Observable<String> observable= imageUploaderService.uploadImage(fileNames[i],imagesRef,chosenImage[i]);
observable.subscribe(AllImageUploaderService.this);
}
}
});
observable.subscribe(uploadService);
return uploadService;
}
@Override
public void onCompleted() {
actualCount++;
if(expectedCount==actualCount) {
String[] retValue = new String[uploadedFiles.size()];
uploadedFiles.toArray(retValue);
uploadService.onNext(retValue);
uploadService.onCompleted();
}
}
@Override
public void onError(Throwable e) {
Utils.logE(this,"Error",e);
}
@Override
public void onNext(String s) {
uploadedFiles.add(s);
}
}
ImageUploaderService.java
public class ImageUploaderService implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {
private BehaviorSubject<String> uploadSubject;
public Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
final String chosenImage) {
uploadSubject = BehaviorSubject.create();
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
StorageReference spaceRef = imagesRef.child(fileNames);
UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
uploadTask.addOnFailureListener(ImageUploaderService.this);
uploadTask.addOnSuccessListener(ImageUploaderService.this);
}
});
observable.subscribe(uploadSubject);
return uploadSubject;
}
@Override
public void onFailure(@NonNull Exception exception) {
uploadSubject.onError(exception);
uploadSubject.onCompleted();
}
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
Uri downloadUrl = taskSnapshot.getDownloadUrl();
uploadSubject.onNext(downloadUrl.toString());
uploadSubject.onCompleted();
}
}
Было бы здорово, если бы вы могли сообщить мне лучшие подходы к тому же.