Predicate断言
Predicate接口
Predicate 断言
使用
Predicate<Integer> predicate = x -> x > 5;
Predicate<Integer> p = x -> x < 10;
System.err.println(predicate.and(p).test(8)); // true 10>8>5
System.err.println(predicate.or(p).test(11)); // true 11> 5 一真为真
System.err.println(predicate.negate().test(11)); // false 11 > 5 取反
// Predicate.isEqual方法返回的Predicate类型的test方法中的参数与targetRef做比较
System.err.println(Predicate.isEqual(1).test(1)); // true
System.err.println(Predicate.isEqual(1).test(2)); // false
对象的引用
方法引用的几种形式
- 类名::静态方法名
- 类名::实例方法名
- 对象::实例方法名
- 类型::new(构造方法的引用)
函数式接口
方法中接口作为参数的情况:
方法传递一个接口,这个接口有且只有一个方法,我们称为A方法,
B方法调用这个A方法时,这个接口参数即为这个方法参数,
可以如下简化
()->{return xxx}
箭头左边就是这个A方法需要传递的参数,右边为这个B方法返回的参数。
例如 方法中传递接口
public static <T> T handle(Invoker<T> invoker) {
T invoke(ReservationOperateDto obj);
};
调用的方法
public String saveHeadInfo(ReservationMerchantDto reservationMerchantDto) {
handle( (ReservationOperateDto res)->{return "string";})
}
Function<T, R>
——接收T
,返回R
// 函数式接口
/**
* @Author YIFan
* @Date 2023-5-13 14:41
* @Version 1.0
*/
public class FuncIfUtil {
public static ThrowExceptionFunction isTrue(Boolean bool) {
return message -> {
if (!bool) throw new RuntimeException(message);
};
}
public static NullBranchHandlerFunction<Object> isNull(Object obj) {
return (trueHandler, falseHandle) -> {
if (StringUtils.isEmpty(obj)) {
System.err.println("为空");
trueHandler.run();
} else {
System.out.println("不为空");
falseHandle.accept(obj);
}
};
}
}
@FunctionalInterface
public interface NullBranchHandlerFunction<C> {
void isNullHandler(Runnable trueHandler, Consumer<? super C> falseHandler);
}
@FunctionalInterface
public interface ThrowExceptionFunction {
void throwEx(String message);
default void throwEx2(String message,String a ){};
}
public class FunTest {
public static void main(String[] args) {
FuncIfUtil.isTrue(false).throwEx("报错了");
AtomicReference<String> a = new AtomicReference<>("");
FuncIfUtil.isNull("12221").isNullHandler(() -> {
a.set("null");
}, (consume) -> {
a.set(consume.toString());
});
System.err.println(a.get());
}
}
CompletableFuture异步编程
1、CompletableFuture创建
CompletableFuture 对象,任务会在默认的 ForkJoinPool 中异步执行。
//该方法用于执行具有返回值的任务,并在任务完成时返回结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行具有返回值的任务
return "任务结果";
});
//创建一个没有返回值的 CompletableFuture 对象
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行没有返回值的任务
});
//获取任务结果
join() 会阻塞当前线程,直到任务完成并返回结果。
get() 会阻塞当前线程,直到任务完成并返回结果。
与join() 方法不同的是,get() 方法会抛出InterruptedException和ExecutionException异常,需要进行异常处理。
2、异步回调方法
//然后申请 把上一个线程的结果“应用于”下一个线程的计算。相当于结果值的传递
thenApply(Function<? super T, ? extends U> fn) //同步执行的
thenApplyAsync //异步
//输入参数:上一阶段的任务结果类型为 T。
//返回值:新阶段的任务结果类型为 U。
//功能:对上一阶段的任务结果进行转换操作,并返回一个新的 CompletableFuture 对象。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
.thenApply(result -> result * 2)
.thenApply(result -> result + 1);
使用 thenApply()方法对上一阶段的结果进行转换,
将结果乘以 2,并将转换后的结果加 1。
每个 thenApply()方法都返回一个新的 CompletableFuture 对象,可以继续链式调用。
//然后消费
thenAccept(Consumer<? super T> action)
//输入参数:上一阶段的任务结果类型为 T。
//返回值:CompletableFuture,没有返回值。
//功能:对上一阶段的任务结果进行消费操作,没有返回值。消费后,后面就获取不到这个T的结果了
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
.thenAccept(result -> System.out.println("任务结果:" + result));
//然后运行
thenRun(Runnable action)
////输入参数:无。
//返回值:CompletableFuture,没有返回值。
//功能:在上一阶段任务完成后执行给定的 Runnable 任务,没有输入参数和返回值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
.thenRun(() -> System.out.println("任务执行完毕"));
//然后合并
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
//输入参数:另一个 CompletionStage 对象和一个 BiFunction 函数,函数的输入参数分别为上一阶段的任务结果类型 T 和另一个 CompletionStage 对象的任务结果类型 U,函数的返回值类型为 V。
//返回值:新阶段的任务结果类型为 V。
//功能:当两个 CompletionStage 对象都完成时,将它们的任务结果传递给给定的 BiFunction 函数进行组合处理,并返回一个新的 CompletableFuture 对象。
//然后编写
thenCompose(Function<? super T, ? extends CompletionStage> fn)
//输入参数:一个 Function 函数,函数的输入参数为上一阶段的任务结果类型 T,函数的返回值为另一个 CompletionStage 对象。
//返回值:新阶段的任务结果类型为 U。
//功能:当上一阶段的任务完成后,将结果传递给给定的 Function 函数,该函数返回一个新的 CompletionStage 对象,新阶段的任务结果类型为 U。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result * 2));
//所有都执行完
allOf(CompletableFuture<?>... cfs)
//输入参数:多个 CompletableFuture 对象。
//返回值:CompletableFuture,没有返回值。
//功能:等待所有给定的 CompletableFuture 对象都完成,返回一个新的 CompletableFuture 对象。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
3、异常处理方法
exceptionally(Function<Throwable, ? extends T> fn)
//当 CompletableFuture 执行过程中发生异常时,使用指定的函数进行异常处理,并返回一个新的 CompletableFuture 对象
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行异常");
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
System.out.println("异常处理:" + ex.getMessage());
return 0; // 默认值
});
handle(BiFunction<? super T, Throwable, ? extends U> fn)
//当 CompletableFuture 执行完成时,使用指定的函数处理结果或异常,并返回一个新的 CompletableFuture 对象。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
System.out.println("异常处理:" + ex.getMessage());
return "默认值";
} else {
return "结果:" + result;
}
});
Reactor 响应式
Reactor:一个强大的响应式编程框架
Mono:Reactor 框架中的一个核心组件(就像工厂生产的一种型号的汽车)
Reactor 是发动机,Mono 是发动机生产的一种汽车
Reactor 主要功能
- ✅ 异步处理:非阻塞式编程模型
- ✅ 背压支持(Backpressure):消费者控制生产者速度
- ✅ 丰富的操作符:map、filter、flatMap 等 200+ 操作符
- ✅ 线程调度:灵活控制任务执行线程
- ✅ 错误处理:强大的错误恢复机制
Reactor 框架
│
├── Mono
│
├── Flux
│
├── Schedulers // 线程调度器
│
├── Operators // 操作符 (map, filter, flatMap…)
│
└── Core Features // 核心功能 (背压、错误处理…)
Mono 响应式流类型
创建Mono
Mono<String> emptyMono = Mono.empty(); // 创建一个空的 Mono
Mono<String> neverMono = Mono.never(); // 创建一个永远不会发出信号的 Mono
Mono<String> mono = Mono.just("Hello"); // 从确定的值创建。用于创建一个立即包含确定值的 Mono。
Mono<String> deferredMono = Mono.fromSupplier(() -> "Hello"); // 延迟创建
Mono<String> error = Mono.error(new RuntimeException("出错啦!"));
// 从 CompletableFuture 转换
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");
Mono<String> fromFuture = Mono.fromFuture(future);
// 从 Callable 转换-->得到一个有返回值的Mono
Mono<String> fromCallable = Mono.fromCallable(() -> expensiveOperation());
// 从 Runnable 转换-->得到一个空的Mono
Mono<Void> fromRunnable = Mono.fromRunnable(() -> cleanupOperation());
subscribe
subscribe()
是触发整个响应式流开始执行的开关
//得到一个有返回值的Mono
Mono<String> mono = Mono.fromCallable(() -> {
System.out.println("执行线程: " + Thread.currentThread().getName());
return "Hello";
});
// 只有调用 subscribe(),上面的代码才会真正执行
mono.subscribe(result -> System.out.println("结果: " + result));
永远记住:在WebFlux中,你的工作是 return 流,而不是 subscribe 流。把订阅的权利交给框架。
如果你写的一个方法需要返回 Mono<T>
或 Flux<T>
,那么在这个方法内部就绝对不要调用 subscribe()
。订阅是调用者的权利和责任。
subscribeOn
subscribeOn()
告诉 Reactor 在哪个线程池执行源头的任务。它影响的是整个流的执行位置。
Mono<String> mono = Mono.fromCallable(() -> {
System.out.println("执行线程: " + Thread.currentThread().getName());
return "耗时操作结果";
}).subscribeOn(Schedulers.boundedElastic()); // 指定在弹性线程池执行
mono.subscribe(result -> System.out.println("结果: " + result));
特性 | subscribe() |
subscribeOn() |
---|---|---|
目的 | 触发执行 | 控制执行线程 |
调用时机 | 最后调用 | 中间操作 |
返回值 | Disposable (可取消) |
Mono<T> (新的流) |
调用次数 | 只能调用一次 | 可多次调用(但只有第一次有效) |
本质 | 消费者方法 | 操作符 |
flatMap
flatMap
用于异步转换:当你的转换操作本身返回另一个 Mono 或 Flux 时使用它
// 你拿到一份菜单(Mono)
Mono<String> result = Mono.just("start")
// flatMap:选择菜品后,厨师开始做菜(返回新的 Mono)
Mono<String> result = result.flatMap(str -> asyncStep1(str)) // 第一步
.flatMap(result1 -> asyncStep2(result1)) // 第二步
.flatMap(result2 -> asyncStep3(result2)) // 第三步
.flatMap(result3 -> asyncStep4(result3)); // 第四步
为什么有些flatMap不调用 subscribe?
情况一:在WebFlux框架中(最常见)
@RestController
public class MyController {
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userRepository.findById(id) // 返回一个Mono<User>
.flatMap(user -> {
// 假设需要根据user id再去查另一个服务
return otherService.fetchUserDetails(user.getDetailId());
});
}
}
// 在这个例子中,你的确没有调用 .subscribe()。但是,Spring WebFlux框架替你调用了。
情况二:在测试代码中 StepVerifier
使用 StepVerifier
进行测试时,它内部会执行订阅操作。
java
@Test
void testFlatMap() {
Mono<String> source = Mono.just("hello");
Mono<String> result = source.flatMap(s -> Mono.just(s + " world"));
// StepVerifier.create() 会订阅给定的流
StepVerifier.create(result)
.expectNext("hello world")
.verifyComplete(); // verifyComplete() 隐含了订阅和断言
}
你不需要手动写 result.subscribe(...)
,因为 StepVerifier
替你做了。
Flux 响应式流类型
特性 | Mono (一个流的流) | Flux (多个流的流) |
---|---|---|
比喻 | 一个一次性任务(Task) | 一条生产线或事件流(Pipeline/Stream) |
例子 | 下单购买一件商品 | 监控一条生产线上的所有产品 |
意图 | 获取一个结果 | 处理一个序列 |
关注点 | 最终 outcome:成功?失败?结果是什么? | 过程与序列:下一个是什么?什么时候结束? |
类比Java | Future<T> 、Callable<T> |
Iterator<T> 、List<T> 的异步版 |
类比SQL | SELECT * FROM users WHERE id = 1 (返回一行) |
SELECT * FROM users (返回多行) |
Flux和Mono如何选择
当你需要编写一个返回响应式类型的方法时,问自己这个问题:
“我的方法预计是返回一个单一的结果,还是一个结果的序列?”
- 查询用户详情? -> 返回
Mono<User>
- 查询所有用户? -> 返回
Flux<User>
- 保存用户? -> 返回
Mono<User>
(保存后的实体) - 建立实时消息连接? -> 返回
Flux<Message>
(持续不断的消息流) - 计算总数? -> 返回
Mono<Integer>
- 监听点击事件? -> 返回
Flux<ClickEvent>
(事件流)
日夜颠倒头发少 ,单纯好骗恋爱脑 ,会背九九乘法表 ,下雨只会往家跑 ,搭讪只会说你好 ---- 2050781802@qq.com