响应式编程

Predicate断言

Predicate接口

在这里插入图片描述

Predicate 断言

在这里插入图片描述

使用

Predicate<Integer> predicate = x -> x > 5;
Predicate<Integer> p = x -> x < 10;
System.err.println(predicate.and(p).test(8)); // true 10>8>5
System.err.println(predicate.or(p).test(11)); // true 11> 5 一真为真
System.err.println(predicate.negate().test(11)); // false 11 > 5 取反
// Predicate.isEqual方法返回的Predicate类型的test方法中的参数与targetRef做比较
System.err.println(Predicate.isEqual(1).test(1)); // true
System.err.println(Predicate.isEqual(1).test(2)); // false

对象的引用

方法引用的几种形式

  • 类名::静态方法名
  • 类名::实例方法名
  • 对象::实例方法名
  • 类型::new(构造方法的引用)

函数式接口

方法中接口作为参数的情况:

方法传递一个接口,这个接口有且只有一个方法,我们称为A方法,
B方法调用这个A方法时,这个接口参数即为这个方法参数,
可以如下简化
()->{return xxx}
箭头左边就是这个A方法需要传递的参数,右边为这个B方法返回的参数。


例如 方法中传递接口
 public static <T> T handle(Invoker<T> invoker) {
   T invoke(ReservationOperateDto obj);
 };
 
调用的方法
 public String saveHeadInfo(ReservationMerchantDto reservationMerchantDto) {
   handle( (ReservationOperateDto res)->{return "string";})
 
 }
  • Function<T, R>——接收 T,返回 R

image-20230403145621263

// 函数式接口
/**
 * @Author YIFan
 * @Date 2023-5-13 14:41
 * @Version 1.0
 */
public class FuncIfUtil {

    public static ThrowExceptionFunction isTrue(Boolean bool) {
        return message -> {
            if (!bool) throw new RuntimeException(message);
        };
    }

    public static NullBranchHandlerFunction<Object> isNull(Object obj) {
        return (trueHandler, falseHandle) -> {
            if (StringUtils.isEmpty(obj)) {
                System.err.println("为空");
                trueHandler.run();
            } else {
                System.out.println("不为空");
                falseHandle.accept(obj);
            }
        };
    }
}
@FunctionalInterface
public interface NullBranchHandlerFunction<C> {
    void isNullHandler(Runnable trueHandler, Consumer<? super C> falseHandler);
}

@FunctionalInterface
public interface ThrowExceptionFunction {
    void throwEx(String message);
    default void throwEx2(String message,String a ){};
}
public class FunTest {

    public static void main(String[] args) {

        FuncIfUtil.isTrue(false).throwEx("报错了");

        AtomicReference<String> a = new AtomicReference<>("");
        FuncIfUtil.isNull("12221").isNullHandler(() -> {
            a.set("null");
        }, (consume) -> {
            a.set(consume.toString());
        });
        System.err.println(a.get());
    }
}

CompletableFuture异步编程

1、CompletableFuture创建

CompletableFuture 对象,任务会在默认的 ForkJoinPool 中异步执行。

//该方法用于执行具有返回值的任务,并在任务完成时返回结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 执行具有返回值的任务
    return "任务结果";
});
//创建一个没有返回值的 CompletableFuture 对象
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 执行没有返回值的任务
});
//获取任务结果
join() 会阻塞当前线程,直到任务完成并返回结果。
get()  会阻塞当前线程,直到任务完成并返回结果。
    与join() 方法不同的是,get() 方法会抛出InterruptedException和ExecutionException异常,需要进行异常处理。

2、异步回调方法

//然后申请   把上一个线程的结果“应用于”下一个线程的计算。相当于结果值的传递
thenApply(Function<? super T, ? extends U> fn)  //同步执行的
thenApplyAsync //异步

//输入参数:上一阶段的任务结果类型为 T。
//返回值:新阶段的任务结果类型为 U。
//功能:对上一阶段的任务结果进行转换操作,并返回一个新的 CompletableFuture 对象。
    
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
    .thenApply(result -> result * 2)
    .thenApply(result -> result + 1);

使用 thenApply()方法对上一阶段的结果进行转换,
    将结果乘以 2,并将转换后的结果加 1。
    每个 thenApply()方法都返回一个新的 CompletableFuture 对象,可以继续链式调用。
//然后消费 
thenAccept(Consumer<? super T> action)
    
//输入参数:上一阶段的任务结果类型为 T。
//返回值:CompletableFuture,没有返回值。
//功能:对上一阶段的任务结果进行消费操作,没有返回值。消费后,后面就获取不到这个T的结果了
    
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
    .thenAccept(result -> System.out.println("任务结果:" + result));
//然后运行
thenRun(Runnable action)
        
////输入参数:无。
//返回值:CompletableFuture,没有返回值。
//功能:在上一阶段任务完成后执行给定的 Runnable 任务,没有输入参数和返回值。
    
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
    .thenRun(() -> System.out.println("任务执行完毕"));
//然后合并
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
    
//输入参数:另一个 CompletionStage 对象和一个 BiFunction 函数,函数的输入参数分别为上一阶段的任务结果类型 T 和另一个 CompletionStage 对象的任务结果类型 U,函数的返回值类型为 V。
//返回值:新阶段的任务结果类型为 V。
//功能:当两个 CompletionStage 对象都完成时,将它们的任务结果传递给给定的 BiFunction 函数进行组合处理,并返回一个新的 CompletableFuture 对象。
//然后编写
thenCompose(Function<? super T, ? extends CompletionStage> fn)

//输入参数:一个 Function 函数,函数的输入参数为上一阶段的任务结果类型 T,函数的返回值为另一个 CompletionStage 对象。
//返回值:新阶段的任务结果类型为 U。
//功能:当上一阶段的任务完成后,将结果传递给给定的 Function 函数,该函数返回一个新的 CompletionStage 对象,新阶段的任务结果类型为 U。
    
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);

CompletableFuture<Integer> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result * 2));
//所有都执行完
allOf(CompletableFuture<?>... cfs)
    
//输入参数:多个 CompletableFuture 对象。
//返回值:CompletableFuture,没有返回值。
//功能:等待所有给定的 CompletableFuture 对象都完成,返回一个新的 CompletableFuture 对象。
    
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

3、异常处理方法

exceptionally(Function<Throwable, ? extends T> fn)
//当 CompletableFuture 执行过程中发生异常时,使用指定的函数进行异常处理,并返回一个新的 CompletableFuture 对象
    
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行异常");
});

CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
    System.out.println("异常处理:" + ex.getMessage());
    return 0; // 默认值
});
handle(BiFunction<? super T, Throwable, ? extends U> fn) 
//当 CompletableFuture 执行完成时,使用指定的函数处理结果或异常,并返回一个新的 CompletableFuture 对象。
    
    
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);

CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("异常处理:" + ex.getMessage());
        return "默认值";
    } else {
        return "结果:" + result;
    }
});

Reactor 响应式

Reactor:一个强大的响应式编程框架

Mono:Reactor 框架中的一个核心组件(就像工厂生产的一种型号的汽车)

Reactor 是发动机,Mono 是发动机生产的一种汽车

Reactor 主要功能

  • 异步处理:非阻塞式编程模型
  • 背压支持(Backpressure):消费者控制生产者速度
  • 丰富的操作符:map、filter、flatMap 等 200+ 操作符
  • 线程调度:灵活控制任务执行线程
  • 错误处理:强大的错误恢复机制

Reactor 框架

├── Mono // 0-1 个元素的流

├── Flux // 0-N 个元素的流

├── Schedulers // 线程调度器

├── Operators // 操作符 (map, filter, flatMap…)

└── Core Features // 核心功能 (背压、错误处理…)

Mono 响应式流类型

创建Mono

Mono<String> emptyMono = Mono.empty(); // 创建一个空的 Mono
Mono<String> neverMono = Mono.never(); // 创建一个永远不会发出信号的 Mono

Mono<String> mono = Mono.just("Hello"); // 从确定的值创建。用于创建一个立即包含确定值的 Mono。
Mono<String> deferredMono = Mono.fromSupplier(() -> "Hello"); // 延迟创建

Mono<String> error = Mono.error(new RuntimeException("出错啦!"));

// 从 CompletableFuture 转换
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");
Mono<String> fromFuture = Mono.fromFuture(future);

// 从 Callable 转换-->得到一个有返回值的Mono
Mono<String> fromCallable = Mono.fromCallable(() -> expensiveOperation());

// 从 Runnable 转换-->得到一个空的Mono
Mono<Void> fromRunnable = Mono.fromRunnable(() -> cleanupOperation());

subscribe

subscribe() 是触发整个响应式流开始执行的开关

//得到一个有返回值的Mono
Mono<String> mono = Mono.fromCallable(() -> {
    System.out.println("执行线程: " + Thread.currentThread().getName());
    return "Hello";
});

// 只有调用 subscribe(),上面的代码才会真正执行
mono.subscribe(result -> System.out.println("结果: " + result));

永远记住:在WebFlux中,你的工作是 return 流,而不是 subscribe 流。把订阅的权利交给框架。

如果你写的一个方法需要返回 Mono<T>Flux<T>,那么在这个方法内部就绝对不要调用 subscribe()。订阅是调用者的权利和责任。

subscribeOn

subscribeOn() 告诉 Reactor 在哪个线程池执行源头的任务。它影响的是整个流的执行位置。

Mono<String> mono = Mono.fromCallable(() -> {
    System.out.println("执行线程: " + Thread.currentThread().getName());
    return "耗时操作结果";
}).subscribeOn(Schedulers.boundedElastic()); // 指定在弹性线程池执行

mono.subscribe(result -> System.out.println("结果: " + result));
特性 subscribe() subscribeOn()
目的 触发执行 控制执行线程
调用时机 最后调用 中间操作
返回值 Disposable(可取消) Mono<T>(新的流)
调用次数 只能调用一次 可多次调用(但只有第一次有效)
本质 消费者方法 操作符

flatMap

flatMap 用于异步转换:当你的转换操作本身返回另一个 Mono 或 Flux 时使用它

// 你拿到一份菜单(Mono)
Mono<String> result = Mono.just("start")

// flatMap:选择菜品后,厨师开始做菜(返回新的 Mono)
Mono<String> result = result.flatMap(str -> asyncStep1(str)) // 第一步
    .flatMap(result1 -> asyncStep2(result1)) // 第二步
    .flatMap(result2 -> asyncStep3(result2)) // 第三步
    .flatMap(result3 -> asyncStep4(result3)); // 第四步

为什么有些flatMap不调用 subscribe?

情况一:在WebFlux框架中(最常见)

@RestController
public class MyController {

    @GetMapping("/user/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userRepository.findById(id) // 返回一个Mono<User>
                .flatMap(user -> {
                    // 假设需要根据user id再去查另一个服务
                    return otherService.fetchUserDetails(user.getDetailId());
                });
    }
}

// 在这个例子中,你的确没有调用 .subscribe()。但是,Spring WebFlux框架替你调用了。

情况二:在测试代码中 StepVerifier

使用 StepVerifier 进行测试时,它内部会执行订阅操作。

java

@Test
void testFlatMap() {
    Mono<String> source = Mono.just("hello");
    Mono<String> result = source.flatMap(s -> Mono.just(s + " world"));

    // StepVerifier.create() 会订阅给定的流
    StepVerifier.create(result)
                .expectNext("hello world")
                .verifyComplete(); // verifyComplete() 隐含了订阅和断言
}

你不需要手动写 result.subscribe(...),因为 StepVerifier 替你做了。

Flux 响应式流类型

特性 Mono (一个流的流) Flux (多个流的流)
比喻 一个一次性任务(Task) 一条生产线或事件流(Pipeline/Stream)
例子 下单购买一件商品 监控一条生产线上的所有产品
意图 获取一个结果 处理一个序列
关注点 最终 outcome:成功?失败?结果是什么? 过程与序列:下一个是什么?什么时候结束?
类比Java Future<T>Callable<T> Iterator<T>List<T> 的异步版
类比SQL SELECT * FROM users WHERE id = 1 (返回一行) SELECT * FROM users (返回多行)

Flux和Mono如何选择

当你需要编写一个返回响应式类型的方法时,问自己这个问题:

“我的方法预计是返回一个单一的结果,还是一个结果的序列?”

  • 查询用户详情? -> 返回 Mono<User>
  • 查询所有用户? -> 返回 Flux<User>
  • 保存用户? -> 返回 Mono<User>(保存后的实体)
  • 建立实时消息连接? -> 返回 Flux<Message>(持续不断的消息流)
  • 计算总数? -> 返回 Mono<Integer>
  • 监听点击事件? -> 返回 Flux<ClickEvent>(事件流)

日夜颠倒头发少 ,单纯好骗恋爱脑 ,会背九九乘法表 ,下雨只会往家跑 ,搭讪只会说你好 ---- 2050781802@qq.com

×

喜欢就点赞,疼爱就打赏

相册 说点什么 简历