Рассмотрим следующее:
[Fact]
public void foo()
{
var result = new Subject<bool>();
var startCount = 0;
var completionCount = 0;
var obs = Observable
.Defer(() =>
{
++startCount;
return result.FirstAsync();
})
.Do(_ => ++completionCount)
.Publish()
.RefCount();
// pretend there are lots of subscribers at once
var s1 = obs.Subscribe();
var s2 = obs.Subscribe();
var s3 = obs.Subscribe();
// even so, we only expect to be started once
Assert.Equal(1, startCount);
Assert.Equal(0, completionCount);
// and we won't complete until the result ticks through
result.OnNext(true);
Assert.Equal(1, startCount);
Assert.Equal(1, completionCount);
s1.Dispose();
s2.Dispose();
s3.Dispose();
// now try exactly the same thing again
s1 = obs.Subscribe();
s2 = obs.Subscribe();
s3 = obs.Subscribe();
// startCount is 4 here instead of the expected 2!
Assert.Equal(2, startCount);
Assert.Equal(1, completionCount);
result.OnNext(true);
Assert.Equal(2, startCount);
Assert.Equal(2, completionCount);
s1.Dispose();
s2.Dispose();
s3.Dispose();
}
Мое понимание Publish
+ RefCount
заключается в том, что соединение с источником сохраняется до тех пор, пока есть хотя бы один подписчик. Как только последний подписчик отключится, любой будущий подписчик повторно инициирует подключение к источнику.
Как вы можете видеть в моем тесте, все работает отлично с первого раза. Но во второй раз отложенная наблюдаемая внутри пайплайна выполняется один раз для каждого нового подписчика.
С помощью отладчика я вижу, что для первой группы подписчиков obs._count
(которая считается подписчиками) увеличивается с каждым вызовом до Subscribe
. А вот для второй группы абонентов она остается нулевой.
Почему это происходит и что я могу сделать, чтобы исправить мой конвейер?