WedX - журнал о программировании и компьютерных науках

В RxJava запускается только одно событие, даже если подписано несколько наблюдаемых

Я пытаюсь понять RxJava. Я написал небольшой фрагмент кода, который будет загружать файлы списка на сервер firebase. Я написал два наблюдаемых: 1. ImageUploaderService.java: загрузить один файл и сообщить о статусе 2. AllImageUploaderService.java: создать новый загрузчик одного файла и передать ему один путь к файлу для загрузки. Соберите результаты всех загрузчиков и сообщите вызывающему с помощью ссылки загруженных файлов.

Моя проблема заключается в том, что даже если каждый экземпляр ImageUploaderService подписан в AllImageUploaderService, запускается только одно событие OnNext. Как я могу это исправить?

AllImageUploaderService.java

    public class AllImageUploaderService extends Subscriber<String>{
    private BehaviorSubject<String[]> uploadService;
    private ArrayList<String> uploadedFiles;
    private int expectedCount;
    private int actualCount;

    public Observable<String[]> uploadImages(final String[] fileNames, final StorageReference imagesRef,
        final String[] chosenImage) {
        expectedCount = fileNames.length;
        uploadedFiles = new ArrayList<>(fileNames.length);

        uploadService = BehaviorSubject.create();

        Observable<String[]> observable = Observable.create(new Observable.OnSubscribe<String[]>(){
            @Override
            public void call(Subscriber<? super String[]> subscriber) {
                for (int i = 0; i < fileNames.length; i++) {
                    ImageUploaderService imageUploaderService=new ImageUploaderService();
                    Observable<String> observable= imageUploaderService.uploadImage(fileNames[i],imagesRef,chosenImage[i]);
                    observable.subscribe(AllImageUploaderService.this);
                }
            }
        });
        observable.subscribe(uploadService);
        return uploadService;
    }

    @Override
    public void onCompleted() {
        actualCount++;
        if(expectedCount==actualCount) {
            String[] retValue = new String[uploadedFiles.size()];
            uploadedFiles.toArray(retValue);
            uploadService.onNext(retValue);
            uploadService.onCompleted();
        }
    }

    @Override
    public void onError(Throwable e) {
        Utils.logE(this,"Error",e);
    }

    @Override
    public void onNext(String s) {
        uploadedFiles.add(s);
    }
}

ImageUploaderService.java

public class ImageUploaderService implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {

    private BehaviorSubject<String> uploadSubject;

    public Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
        final String chosenImage) {

        uploadSubject = BehaviorSubject.create();
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {               
                StorageReference spaceRef = imagesRef.child(fileNames);
                UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
                uploadTask.addOnFailureListener(ImageUploaderService.this);
                uploadTask.addOnSuccessListener(ImageUploaderService.this);
            }
        });
        observable.subscribe(uploadSubject);
        return uploadSubject;
    }

    @Override
    public void onFailure(@NonNull Exception exception) {
        uploadSubject.onError(exception);
        uploadSubject.onCompleted();
    }

    @Override
    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
        Uri downloadUrl = taskSnapshot.getDownloadUrl();
        uploadSubject.onNext(downloadUrl.toString());
        uploadSubject.onCompleted();
    }
}

Было бы здорово, если бы вы могли сообщить мне лучшие подходы к тому же.

17.10.2016

  • Попробуйте удалить onCompleted. Потому что onCompleted — терминальное событие. Никакие значения не будут переданы по одному и тому же субъекту повторно. 17.10.2016
  • @BharathMg, это очень хороший способ получить наблюдаемые, которые никогда не завершатся. 17.10.2016

Ответы:


1

Не нужно ничего усложнять:

public Observable<String> uploadImages(final String[] fileNames, final StorageReference imagesRef,
    final String[] chosenImage) {

    return Observable
        .range(0, fileNames.length)
        .flatMap(i -> new ImageUploaderService()
             .uploadImage(fileNames[i],imagesRef,chosenImage[i]))
}

Вам это нужно в качестве коллекции, когда все закончится? .toList().

Кроме того, я думаю, что вам не нужна тема в ImageUploaderService:

public class ImageUploader implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {

  public static Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
    final String chosenImage) {

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {               
            ImageUploader uploader = new ImageUploader(subscriber);
            StorageReference spaceRef = imagesRef.child(fileNames);
            UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
            uploadTask.addOnFailureListener(uploader);
            uploadTask.addOnSuccessListener(uploader);
        }
    });
    return observable;
  }

  /////////

  private final Subscriber<String> subscriber;

  private ImageUploader(final Subscriber<String> subscriber) {
    this.subscriber = subscriber;
  }

  @Override
  public void onFailure(@NonNull Exception exception) {
    if(subscriber.isUnsubscribed()) return;
    subscriber.onError(exception);
  }

  @Override
  public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
    if(subscriber.isUnsubscribed()) return;
    Uri downloadUrl = taskSnapshot.getDownloadUrl();
    uploadSubject.onNext(downloadUrl.toString());
    uploadSubject.onCompleted();
  }
}

Обратите внимание: не выдавайте onComplete() после onError(), любое из них является конечным состоянием для Observables, и любое из них должно быть последним.

17.10.2016
  • ваш код работал для меня. Не могли бы вы указать, что не так с моим кодом, кроме onComplete() после onError()? 21.10.2016
  • Вы подписываетесь, а затем возвращаете исходный наблюдаемый объект. Это вызовет 2 разные подписки, и если вы явно не планируете это через публикацию/общий доступ/кэш, вы получите несколько разных подписок. 21.10.2016
  • Новые материалы

    Как проанализировать работу вашего классификатора?
    Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

    Работа с цепями Маркова, часть 4 (Машинное обучение)
    Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

    Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
    Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

    Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
    Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

    Учебные заметки: создание моего первого пакета Node.js
    Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

    Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
    Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..

    ИИ в аэрокосмической отрасли
    Каждый полет – это шаг вперед к великой мечте. Чтобы это происходило в их собственном темпе, необходима команда астронавтов для погони за космосом и команда технического обслуживания..


    Для любых предложений по сайту: [email protected]