创建操作

Create

使用一个函数从头开始创建一个Observable

示例(冷流)

1
2
3
4
5
6
7
8
9
10
Observable.Create<int>(o =>
{
o.OnNext(1);
o.OnNext(2);
o.OnCompleted();
return Disposable.Create(() => Debug.Log("观察者已取消订阅"));
}).Subscribe(xx =>
{
Debug.Log(xx);
});

示例(热流)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class test : MonoBehaviour
{
List<IObserver<int>> _subscribed = new List<IObserver<int>>();
IObservable<int> aa;
void Start()
{
CreateHot();
aa.Subscribe(xx => { Debug.Log(xx); });
onNext(5);
onNext(4);
}
void CreateHot()
{
aa=Observable.Create<int>(o =>
{
_subscribed.Add(o);
return Disposable.Create(() => _subscribed.Remove(o));
});
}
private void onNext(int val)
{
foreach (var o in _subscribed)
{
o.OnNext(val);
}
}
}

Defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。
示例

1
2
3
4
5
var random = new System.Random();
Observable.Defer(() => Observable.Start(() => random.Next()))
.Delay(TimeSpan.FromMilliseconds(1000))
.Repeat()
.Subscribe(_=>Debug.Log(_));

Empty/Never/Throw

Empty

创建一个不发射任何数据但是正常终止的Observable

1
2
3
4
5
6
7
8
public static IObservable<T> Empty<T>()
{
return Observable.Create<T>(o =>
{
o.OnCompleted();
return Disposable.Empty;
});
}

Never

创建一个不发射数据也不终止的Observable

1
2
3
4
5
6
7
public static IObservable<T> Never<T>()
{
return Observable.Create<T>(o =>
{
return Disposable.Empty;
});
}

Throw

创建一个不发射数据以一个错误终止的Observable

1
2
3
4
5
6
7
8
public static IObservable<T> Throws<T>(Exception exception)
{
return Observable.Create<T>(o =>
{
o.OnError(exception);
return Disposable.Empty;
});
}

这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

From

将其它种类的对象和数据类型转换为Observable

当你使用Observable时,如果你要处理的数据都可以转换成展现为Observables,而不是需要混合使用Observables和其它类型的数据,会非常方便。这让你在数据流的整个生命周期中,可以使用一组统一的操作符来管理它们。

FromCoroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class test : MonoBehaviour
{
void Start()
{
var cancel = Observable.FromCoroutine(AsyncA)
.SelectMany(AsyncB)
.Subscribe();
cancel.Dispose();
}
IEnumerator AsyncA()
{
Debug.Log("a start");
yield return new WaitForSeconds(1);
Debug.Log("a end");
}
IEnumerator AsyncB()
{
Debug.Log("b start");
yield return new WaitForEndOfFrame();
Debug.Log("b end");
}
}
FromAsyncPattern
FromCoroutineValue
FromEventPattern
FromMicroCoroutine
FromEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class test : MonoBehaviour
{
void Start()
{
LogCallbackAsObservable()
.Where(x => x.LogType == LogType.Warning)
.Subscribe();
LogCallbackAsObservable()
.Where(x => x.LogType == LogType.Error)
.Subscribe();
}
public static IObservable<LogCallback> LogCallbackAsObservable()
{
return Observable.FromEvent<Application.LogCallback, LogCallback>(
h => (condition, stackTrace, type) => h(new LogCallback { Condition = condition, StackTrace = stackTrace, LogType = type }),
h => Application.logMessageReceived += h, h => Application.logMessageReceived -= h);
}
public class LogCallback
{
public string Condition;
public string StackTrace;
public UnityEngine.LogType LogType;
}
}

Interval

创建一个按固定时间间隔发射整数序列的Observable

Interval操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。
示例

1
2
3
4
5
void Start()
{
Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Subscribe(_ => { Debug.Log(_); });
}

Return

创建一个发射指定值的Observable

1
2
3
4
5
6
7
8
9
public static IObservable<T> Return<T>(T value)
{
return Observable.Create<T>(o =>
{
o.OnNext(value);
o.OnCompleted();
return Disposable.Empty;
});
}

Just将单个数据转换为发射那个数据的Observable。

return类似于From,但是From会将数组或Iterable的素具取出然后逐个发射,而return只是简单的原样发射,将数组或Iterable当做单个数据。

1
2
3
4
5
void Start()
{
var singleValue = Observable.Return<string>("Value");
singleValue.Subscribe(xx => Debug.Log(xx));
}

Range

创建一个发射特定整数序列的Observable

Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

1
2
3
4
5
void Start()
{
var singleValue = Observable.Range(10, 15);
singleValue.Subscribe(xx => Debug.Log(xx));
}

Repeat

创建一个发射特定数据重复多次的Observable

Repeat重复地发射数据。某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数。

1
2
3
4
5
void Start()
{
Observable.Repeat("aa",3)
.Subscribe(xx => Debug.Log(xx));
}

Start

返回一个Observable,它发射一个类似于函数声明的值

注意:这个函数只会被执行一次,即使多个观察者订阅这个返回的Observable。

1
2
3
4
5
6
7
void Start()
{
Observable.Start(() =>
{
Debug.Log("lalala");
}).Subscribe();
}

Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值。

Timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。

1
2
3
4
5
6
7
8
9
10
11
void Start()
{
Observable.Timer(TimeSpan.FromSeconds(3))
.Subscribe(_=> Debug.Log(_));
}
void Start()
{
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(3))
.Subscribe(_=> Debug.Log(_));
}

变换操作

Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。Buffer操作符在很多语言特定的实现中有很多种变体,它们在如何缓存这个问题上存在区别。

1
2
3
4
5
6
void Start()
{
Observable.Range(0, 10)
.Buffer(3)
.Subscribe(xx => Debug.Log(xx[0]));
}

SelectMany

SelectMany将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable

SelectMany操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后SelectMany合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

注意:SelectMany对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Return(3)
.SelectMany(i => Observable.Range(1, i))
.Subscribe(xx =>
{
Debug.Log("SelectMany:" + xx);
});
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class test : MonoBehaviour
{
void Start()
{
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => i + 1)
.Take(3)
.SelectMany(GetSubValues(3))
.Subscribe(xx =>
{
Debug.Log("SelectMany:" + xx);
});
}
private IObservable<long> GetSubValues(long offset)
{
return Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(4))
.Select(x => (offset * 10) + x)
.Take(3);
}
}

GroupBy

将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列

GroupBy操作符将原始Observable分拆为一些Observables集合,它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。

1
2
3
4
5
6
7
void Start()
{
var source = Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(10);
var group = source.GroupBy(i => i%3);
group.Subscribe(
xx => Debug.Log(xx.Key));
}

Cast

cast操作符将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后再发射数据;

1
2
3
4
5
6
void Start()
{
Observable.Return(0)
.Cast("s")
.Subscribe();
}

对应的有OfType或者通过.Select(i=>(int)i)转换。

1
2
3
4
5
6
7
8
9
10
void Start()
{
var ss = new object[] {1, "s", 0.2f, 3, "g"};
ss.ToObservable()
.OfType(2)
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

Aggregate

聚合方法允许您对序列应用累加器函数
对于基本过载,您需要提供一个函数,该函数获取累加值的当前状态和序列推送的值。该函数的结果是新的累加值。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Range(0, 8)
.Aggregate(0,(acc,currentValue)=>acc+5)
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class test : MonoBehaviour
{
void Start()
{
Observable.Range(0, 8)
.MyMax()
.Subscribe(xx =>
{
Debug.Log(xx);
});
}
}
public static class tt
{
public static IObservable<T> MyMax<T>(this IObservable<T> source)
{
var comparer = Comparer<T>.Default;
Func<T, T, T> max =
(x, y) =>
{
if (comparer.Compare(x, y) < 0)
{
return y;
}
return x;
};
return source.Aggregate(max);
}
}

Scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果

Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Range(0, 5)
.Scan((x, y) => x + y)
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

过滤操作

Throttle

仅在过了一段指定的时间还没发射数据时才发射一个数据

Debounce操作符会过滤掉发射速率过快的数据项。

1
2
3
4
5
6
7
8
void Start()
{
var clickStream = Observable.EveryUpdate()
.Where(_ => Input.GetMouseButtonDown(0));
clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMilliseconds(250)))
.Where(xs => xs.Count >= 2)
.Subscribe(xs => Debug.Log("click count:" + xs.Count));
}

Distinct

抑制(过滤掉)重复的数据项

Distinct的过滤规则是:只允许还没有发射过的数据项通过。

在某些实现中,有一些变体允许你调整判定两个数据不同(distinct)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。

1
2
3
4
5
6
7
8
9
10
void Start()
{
var ss = new int[] {1, 2, 5, 5, 6, 8, 9, 6};
ss.ToObservable()
.Distinct()
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

DistinctUntilChanged


只判定一个数据和它的直接前驱是否是不同的。

DistinctUntilChanged(func)


根据一个函数产生的Key判定两个相邻的数据项是不是不同的。

Where

只发射通过了谓词测试的数据项

Where操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Range(1, 10)
.Where(x => x >= 5)
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

First

只发射第一项(或者满足某个条件的第一项)数据

如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用First操作符。

在某些实现中,First没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用Take(1)
在一些实现中还有一个Single操作符。它的行为与First类似,但为了确保只发射单个值,它会等待原始Observable终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始Observable获取第一项数据,而且也确保只发射一项数据。

1
2
3
4
5
6
7
8
9
10
void Start()
{
Observable.Range(1, 10)
.Where(x => x >= 5)
.First()
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

FirstOrDefault

firstOrDefault与first类似,但是在Observagle没有发射任何数据时发射一个你在参数中指定的默认值。

single

single操作符也与first类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException。

singleOrDefault

和firstOrDefault类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。

IgnoreElements

不发射任何数据,只发射Observable的终止通知

IgnoreElements操作符抑制原始Observable发射的所有数据,只允许它的终止通知(onError或onCompleted)通过。

如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用ignoreElements操作符,它会确保永远不会调用观察者的onNext()方法。

Last

只发射最后一项(或者满足某个条件的最后一项)数据

如果你只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用Last操作符。

在某些实现中,Last没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用TakeLast(1)。

1
2
3
4
5
6
7
8
9
10
void Start()
{
Observable.Range(1, 10)
.Where(x => x >= 5)
.Last()
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

LastOrDefault

lastOrDefault与last类似,不同的是,如果原始Observable没有发射任何值,它发射你指定的默认值。

Sample

定期发射Observable最近发射的数据项

Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。

在某些实现中,有一个ThrottleFirst操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。
注意:如果自上次采样以来,原始Observable没有发射任何数据,这个操作返回的Observable在那段时间内也不会发射任何数据。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Timer(TimeSpan.FromSeconds(1))
.Sample(TimeSpan.FromSeconds(3))
.Subscribe(xx =>
{
Debug.Log(xx);
});
}
1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Range(0,5)
.Sample(Observable.Range(1,2))
.Subscribe(xx =>
{
Debug.Log(xx);
});
}
SampleFrame
1
2
3
4
5
6
7
8
9
10
11
void Start()
{
var cube = GameObject.CreatePrimitive(PrimitiveType.Cube);
cube.AddComponent<ObservableUpdateTrigger>()
.UpdateAsObservable()
.SampleFrame(30)
.Subscribe(x => Debug.Log("cube"), () => Debug.Log("destroy"));
GameObject.Destroy(cube, 3f);
}

Skip

抑制Observable发射的前N项数据

使用Skip操作符,你可以忽略Observable’发射的前N项数据,只保留之后的数据。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Range(0,5)
.Skip(3)
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

Take

只发射前面的N项数据

使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。

TakeLast

发射Observable发射的最后N项数据

使用TakeLast操作符修改原始Observable,你可以只发射Observable’发射的后N项数据,忽略前面的数据。

使用takeLast操作符,你可以只发射原始Observable发射的后N项数据,忽略之前的数据。注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。

takeLast的这个变体默认不在任何特定的调度器上执行。

结合操作

CombineLatest

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

CombineLatest操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

1
2
3
4
5
6
7
8
void Start()
{
var GiveName = new ReactiveProperty<string>("bob");
var FamilyName = new ReactiveProperty<string>("rock");
var fullName = GiveName.CombineLatest(FamilyName, (x, y) => x + " " + y)
.ToReactiveProperty();
Debug.Log(fullName);
}

WhenAll

将两个或多个Observable发射的数据集合并到一起,操作符组合的行为类似于zip,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到Pattern对象,然后操作那个Pattern对象,变换为一个Plan。随后将这些Plan变换为Observable的发射物。

1
2
3
4
5
6
7
8
9
10
void Start()
{
var heavyMethod = Observable.Start(() => { return 10; });
var heavyMethod2 = Observable.Start(() => { return 20; });
Observable.WhenAll(heavyMethod, heavyMethod2)
.Subscribe(xs =>
{
Debug.Log(xs[0] + ":" + xs[1]);
});
}

Merge

合并多个Observables的发射物

使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

正如图例上展示的,任何一个原始Observable的onError通知会被立即传递给观察者,而且会终止合并后的Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Start()
{
var s1 = Observable.Return(10);
var s2 = Observable.Return(5);
var ss = s1.Merge(s2)
.Finally(() =>
{
Debug.Log("finished!");
});
ss.Subscribe(xx =>
{
Debug.Log(xx);
});
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class Ext
{
public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
{
var completed = Observable.Throw<T>(new StreamCompletedException());
return
source.Concat(completed)
.Merge(right.Concat(completed))
.Catch((StreamCompletedException ex) => Observable.Empty<T>());
}
private sealed class StreamCompletedException : Exception
{
}
}

StartWith

在数据序列的开头插入一条指定的项

如果你想要一个Observable在发射数据之前先发射一个指定的数据序列,可以使用StartWith操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat操作符。)

可接受一个Iterable或者多个Observable作为函数的参数。

1
2
3
4
5
6
7
8
9
10
11
void Start()
{
var s1 = Observable.Return(10);
var s2 = Observable.Return(5);
var ss = s1.Merge(s2)
.StartWith(8);
ss.Subscribe(xx =>
{
Debug.Log(xx);
});
}

Switch

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项

Switch订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射的数据将被丢弃(就像图例上的那个黄色圆圈一样)。

Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

1
2
3
4
5
6
7
8
9
10
11
void Start()
{
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
.Zip(stream2, (lhs, rhs) => new {Left = lhs, Right = rhs})
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

错误处理

Catch

从onError通知中恢复发射数据

Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

Retry

如果原始Observable遇到错误,重新订阅它期望它能正常终止

Retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。Retry总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复

辅助操作

Delay

延迟一段指定的时间再发射来自Observable的发射物

Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。
第一种delay接受一个定义时长的参数(包括数量和单位)。每当原始Observable发射一项数据,delay就启动一个定时器,当定时器过了给定的时间段时,delay返回的Observable发射相同的数据项。

注意:delay不会平移onError通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的onNext通知。然而它会平移一个onCompleted通知。

delay默认在computation调度器上执行,你可以通过参数指定使用其它的调度器。

另一种delay不实用常数延时参数,它使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,当任何那样的Observable终止时,delay返回的Observable就发射关联的那项数据。

这种delay默认不在任何特定的调度器上执行。

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Range(0, 5)
.Delay(TimeSpan.FromSeconds(2))
.Subscribe(xx =>
{
Debug.Log(xx);
});
}

Do

注册一个动作作为原始Observable生命周期事件的一种占位符

你可以注册回调,当Observable的某个事件发生时,Rx会在与Observable链关联的正常通知集合中调用它。Rx实现了多种操作符用于达到这个目的。

1
2
3
4
5
6
7
void Start()
{
var elements = new[] {1, 2, 3};
elements.ToObservable()
.Do(e => Debug.Log(e))
.Subscribe();
}

Materialize/Dematerialize

Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。

一个合法的有限的Obversable将调用它的观察者的onNext方法零次或多次,然后调用观察者的onCompleted或onError正好一次。Materialize操作符将这一系列调用,包括原来的onNext通知和终止通知onCompleted或onError都转换为一个Observable发射的数据序列。

Dematerialize操作符是Materialize的逆向过程,它将Materialize转换的结果还原成它原本的形式。

dematerialize反转这个过程,将原始Observable发射的Notification对象还原成Observable的通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
void Start()
{
var subject = new Subject<int>();
var onlyExceptions = subject.Materialize().Where(n => n.Exception != null).Dematerialize();
subject.Subscribe(i => Debug.LogFormat("Subscriber 1: {0}", i),
ex => Debug.LogFormat("Subscriber 1 exception: {0}",ex.Message));
onlyExceptions.Subscribe(i => Debug.LogFormat("Subscriber 2: {0}", i),
ex => Debug.LogFormat("Subscriber 2 exception: {0}", ex.Message));
subject.OnNext(123);
subject.OnError(new Exception("Test Exception"));
}

ObserveOn

指定一个观察者在哪个调度器上观察这个Observable

很多ReactiveX实现都使用调度器
“Scheduler”来管理多线程环境中Observable的转场。你可以使用ObserveOn操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted, onError方法)。

注意:当遇到一个异常时ObserveOn会立即向前传递这个onError终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展示的。

SubscribeOn操作符的作用类似,但它是用于指定Observable本身在特定的调度器上执行,它同样会在那个调度器上给观察者发通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void Start()
{
var heavyMethod = Observable.Start(() =>
{
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
return 10;
});
var heavyMethod2 = Observable.Start(() =>
{
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(3));
return 10;
});
Observable.WhenAll(heavyMethod, heavyMethod2)
.ObserveOnMainThread() // return to main thread
.Subscribe(xs =>
{
Debug.Log(xs[0] + ":" + xs[1]);
});
}

SubscribeOn

指定Observable自身在哪个调度器上执行

很多ReactiveX实现都使用调度器 “Scheduler”来管理多线程环境中Observable的转场。你可以使用SubscribeOn操作符指定Observable在一个特定的调度器上运转。

ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。

TimeInterval

将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable

TimeInterval操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。

1
2
3
4
5
6
7
8
void Start()
{
Observable.Interval(TimeSpan.FromMilliseconds(750))
.TimeInterval()
.Do(x => Debug.Log(x))
.Subscribe();
}

Timeout

对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知

如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。

Timestamp

给Observable发射的数据项附加一个时间戳

timestamp默认在immediate调度器上执行,但是可以通过参数指定其它的调度器。

Using

创建一个只在Observable生命周期内存在的一次性资源

Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。

using操作符接受三个参数:

一个用户创建一次性资源的工厂函数
一个用于创建Observable的工厂函数
一个用于释放资源的函数
当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。

using默认不在任何特定的调度器上执行。

To

将Observable转换为另一个对象或数据结构

ToArray
ToAsync
ToAwaitableEnumerator
ToList
ToObservable
ToYieldInstruction