php小编新一在解释"串联通量,其中第二个通量是与第一个通量的最后值即时创建的"时,可以简洁明了地解释这个概念。在串联通量中,第一个通量的值会传递给第二个通量,而第二个通量的值则是根据第一个通量的最后值即时生成的。这种机制可以用于实现动态的数据传递和处理,使得程序的流程更加灵活和高效。通过合理运用串联通量,可以提升程序的性能和可维护性,提供更好的用户体验。
我怀疑这肯定是重复的,但我只是在谷歌上搜索了错误的术语。
我有两个通量 a 和 b,但 b 只能使用 a 的最后一个值创建。
我想创建一个本质上是 a 和 b 的串联的通量,但 b 的创建会被推迟,直到我们获得 a 的最后一个值为止。
也许,它看起来像这样:
fluxC = fluxA.concatWith(lastA -> createFluxB(lastA))
我不知道库中是否有任何函数完全执行此操作。
但是,您可以通过以下方式制作这样的运算符:
注意:它不应该有太多开销,因为该操作一次只缓存一个值,算法的第二部分应该直接取回缓存的值,而不会触发反向源通量.
这是一个示例实现和测试:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
public class TestConcatLast {
/**
* Create a stream that emit all elements from input flux,
* followed by all items from a flux created by provided function.
* Input function is triggered using last element of source flux as input.
*
* @param source The flux providing elements of the first part of the concatenation.
* @param createFromLastElement A function that provides the tail of the concatenation from a given element.
* It will be triggered <em>at most once</em> using the last element of input flux.
* @param errorIfSourceEmpty If true and input stream is empty, the returned flow will trigger an error.
* If false, an empty flux is produced if input is empty.
*/
public <T> Flux<T> concatLast(Flux<T> source, boolean errorIfSourceEmpty, Function<T, Flux<T>> createFromLastElement) {
var sourceWithLatestCached = source.cache(1);
final Mono<T> deferLast = Mono.defer(errorIfSourceEmpty ? sourceWithLatestCached::last : sourceWithLatestCached::next);
return sourceWithLatestCached.concatWith(
deferLast.flatMapMany(createFromLastElement)
);
}
@Test
public void testConcat() {
var nextExpectedElement = new AtomicInteger(1);
var elts = Flux.just(1, 2, 3, 4)
// Check cache works and no element has been fetched back from source
.doOnNext(i -> {
assert nextExpectedElement.compareAndSet(i, i+1);
});
var concatenated = concatLast(elts, true, i -> Flux.just(i + 1, i + 2, i + 3));
StepVerifier.create(concatenated)
.expectNext(1, 2, 3, 4, 5, 6, 7)
.verifyComplete();
}
}
以上就是串联通量,其中第二个通量是与第一个通量的最后值即时创建的?的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号