Quarkus のリアクティブ版で実装していると Mutiny の Uni や Multi と付き合っていく必要があります。Mutiny の公式ドキュメントに一通り使い方は書かれていますが、英語の読解力に自信がないので実際に使ってまとめてみました。Uni と Multi がありますがまずは Uni のみ調査しました。
チートシート
いきなりチートシートを書いて、説明は下に書く構成にしています。
Uni を作る
やりたいこと | メソッド |
---|---|
Uni を作る | Uni.createFrom().item(value) |
Uni を作る | Uni.createFrom().item(() -> value) |
null の Uni を作る | Uni.createFrom().voidItem() |
処理をつなげる
やりたいこと | メソッド |
---|---|
処理実行 (uni 使わない) | uni.invoke(x -> {}) |
処理実行 (uni を使う) | uni.call(x -> uni2) |
値を変換 (uni 使わない) | uni.map(x -> y) |
値を変換 (uni を使う) | uni.chain(x -> uni2) |
失敗させたい時 | uni.onItem().failWith(x -> exception) |
null (void) だったら違う値にする | uni.replaceIfNullWith(x) |
null (void) だったら違う値にする | uni.replaceIfNullWith(() -> x) |
失敗した時
やりたいこと | メソッド |
---|---|
失敗した時の処理(uni 使わない) | uni.onFailure().invoke(x -> {}) |
失敗した時の処理(uni を使う) | uni.onFailure().call(x -> uni2) |
失敗した時に特定の値に置き換えて処理継続 | uni.onFailure().recoverWithItem(item) |
失敗した時に特定の値に置き換えて処理継続 | uni.onFailure().recoverWithItem(() -> item) |
失敗しても成功しても呼んでほしい
やりたいこと | メソッド |
---|---|
uni 使わない | uni.onItemOrFailure().invoke((item, exception) -> {}) |
uni を使う | uni.onItemOrFailure().call((item, exception) -> uni2) |
invoke の省略 (引数使わない) | uni.eventually(() -> {}) |
call の省略 (引数使わない) | uni.eventually(() -> uni2) |
用語
言葉 | 概要 |
---|---|
Uni | 1つの要素を取り扱う処理 |
Multi | 複数の要素を取り扱う処理 |
invoke | 処理実行 |
call | 処理実行 (Uni を繋ぐ場合) |
map | 値の変換をする |
chain | 値の変換をする (Uni を繋ぐ場合。 Multi にはない) |
subscribe | Uni / Multi を開始し、イベントを購読する |
subscriber | subscribe で登録されたイベント購読処理 |
onItem | 1つの要素の準備ができた(Uni の場合は1回、Multi の場合は複数回発生) |
onFailure | 失敗した |
onCompletion | Multi において、すべての要素の用意が完了した(Uni にはない) |
onSubscription | イベントを購読された |
onCancellation | subscriber から中止要求が出された |
subscriber が受信できるイベントは UniSubscriber を見る限り onSubscribe / onItem / onFailure の模様。
基本的な書き方
Uni を書き始める時、最初は何かを呼んだ時になんか Uni 型で返ってきて Uni を返却してあげる必要があるらしいくらいの認識だと思うので、Uni をもらってから返すまでの間の処理を書けるようになる必要があります。
基本的な書き方は「~の時」「~を実行する」という2つの組み合わせを連続してつなげていきます。
public Uni<Integer> run() {
Uni<Integer> uni = Uni.createFrom().item(1);
return uni
.onItem().invoke(i -> {})
.onItem().call(i -> Uni.createFrom().voidItem())
.onItem().transform(i -> i)
.onItem().transformToUni(i -> Uni.createFrom().item(i))
.onFailure().invoke(exception -> {});
}
onItem() は正常にアイテムを用意できた場合、 onFailure() は例外が発生してアイテムを用意できなかった場合という意味です。
onItem や onFailure の次には「何を実行するか」を続けて書きます。invoke の場合は処理を実行し、 transform の場合は取得したアイテムを元に別のアイテムに変換します。call は invoke の Uni を返すバージョン、 transformToUni は transform の Uni を返す版です。同期処理の場合は前者を、非同期処理を続けたい場合は後者を使うイメージです。
この書き方だと長くなるのでいくつかショートカットが用意されています。
uni.invoke()
はuni.onItem().invoke()
のショートカットuni.call()
はuni.onItem().call()
のショートカットuni.map()
はuni.onItem().transform()
のショートカットuni.chain()
はuni.onItem().transformToUni()
のショートカット
Uni を実行する
Uni を使って処理を書いただけでは処理は実行されません1これが CompletableFuture との主な違いだと紹介されています。では実行開始するにはどうすればいいかというと 2つのパターンがあります。
- subscribe : 非同期実行、ノンブロッキング
- await : 同期実行、ブロッキング
Uni<Object> uni = 何らかの処理();
// subscribe による非同期実行
Uni.createFrom().item("A")
.chain(i -> uni.map(e -> i))
.subscribe().with(i -> Log.info(i));
Log.info("B");
// await による同期実行
var result = Uni.createFrom().item("C")
.chain(i -> uni.map(e -> i))
.await().indefinitely();
Log.info(result);
Log.info("D");
この例の場合、 B → C → D の順序は保証されますが、 A がどのタイミングで出力されるかは不定です。 A の出力は uni 変数の処理時間によって前後します。
例外の投げ方
- throw を使う
- failWith(x -> exception) を使う
map または invoke では必然的に throw を使うことになります。 Uni を返却できないためです。
失敗した時
失敗した場合は onFailure() を使って例外を処理します。
// A, C, D が実行される
Uni.createFrom().item(1)
.onItem().failWith(i -> new RuntimeException())
.onFailure().invoke(() -> Log.info("A"))
.onItem().invoke(() -> Log.info("B"))
.onFailure().invoke(() -> Log.info("C"))
.onFailure().recoverWithItem(2)
.onItem().invoke(() -> Log.info("D"))
.onFailure().invoke(() -> Log.info("E"))
.subscribe().with(i -> Log.info(i));
recoverWithItem で例外を置き換えない限り、最後まで onFailure イベントが伝播していくようです。 特定の処理の例外のみキャッチしたい場合は、以下のように Uni を分けることで実現できます。
// B のみ実行される
Uni.createFrom().item(1)
.onItem().failWith(i -> new RuntimeException())
.chain(i -> Uni.createFrom().item(i)
.onItem().failWith(item -> new RuntimeException())
.onFailure().invoke(() -> Log.info("A")))
.onFailure().invoke(() -> Log.info("B"))
.onFailure().recoverWithItem(2);
.subscribe().with(i -> Log.info(i));
テストコードの書き方
ノンブロッキングの場合
Uni<Integer> uni = Uni.createFrom().item(1);
UniAssertSubscriber<Integer> subscriber = uni
.subscribe().withSubscriber(UniAssertSubscriber.create());
subscriber.assertCompleted().assertItem(1);
Uni の場合は UniAssertSubscriber
を使用します。 AssertSubscriber
は Multi 用です。
ブロックする場合
Uni<Integer> uni = Uni.createFrom().item(63);
Integer actual = uni.await().indefinitely();
assertEquals(63, uni);
ノンブロッキングの方がテスト速いんでしょうけど2未検証、ブロックする方が見た目すっきりするのでこちらを使ってしまいそうです。
複数の Uni を実行する場合
複数の処理 A B があるときに、順序は関係ない3並列で実行されるか、それとも直列で順序ありかはドキュメントに書かれていない模様がA B両方の結果が必要な場合は以下のように書けます。
Uni.combine().all().unis(uniA, uniB).asTuple()
.invoke(t -> {
var itemA = t.getItem1();
var itemB = t.getItem2();
});
結果を破棄したい場合
前の Uni の結果を破棄したい場合は以下のように書けます。
uniA.onItem().ignore().andContinueWithNull()
.chain(() -> uniB);
.onItem().ignore().andContinueWithNull()
は無くても chain の引数を使わなければいいだけですが、明示的に結果を破棄して Uni<Void>
型にしたい場合に利用します。
なぜか動かないとき
前述したとおり、 Uni は subscribe するまで実行されません。また subscribe した時に実行されるのは、 chain, call などによって数珠つなぎで接続された Uni のみです。
よくあるのは chain と map、 call と invoke を間違えているパターンです。 Uni が確実につながっているか確認してみましょう。
参考
更新
- 「複数の Uni を実行する場合」と「結果を破棄したい場合」を追記しました (2021/11/23)