V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
bleulucaswu
V2EX  ›  程序员

reactive 编程+redisson 如何分页的问题

  •  
  •   bleulucaswu · 4 天前 · 812 次点击
    • SpringBoot WebFlux 3.1.0 + redisson 3.27.1 + redis stack 7 单机
    • webflux control 要求业务代码返回 mono
        @Bean
        public RouterFunction<ServerResponse> route(CacheHandler cacheHandler) {
            return RouterFunctions.route()
                    .GET("......", cacheHandler::xxxxxxx)
                    .build();
    
    • redissonReactiveClient 是 client

      • redissonReactiveClient.getSearch(codec).search(.....); 查询 redis 返回 mono<SearchResult>
      • 所以 cacheHandler::xxxxxxx 中 call 上面的查询
      • 如果做分页的话,SearchResult 里面有 total 和 List<Document>当前页数据
    • 但是我如何在一个请求中查出所有数据呢,也就是 reactive 编程怎么搞个循环查询啊

      • 只能将第一次查询的 mono block()拿到 total 吗,但是貌似 reactive 编程不该是这个写法...
      • 如果 Mono<SearchResult> cc = client.search(..第一页数据).cache();
        • 然后 return cc.flatMap(c -> client.search(.. c.getTotal, 然后传参剩下所有数据)).concatWith(cc).collectList();
        • 测试之后返回空的,因为 client.search 本身也是异步多线程的,这个操作在另一个线程进行,webflux 就给我空数据了,还有就是第二个 search 也拿不到真正的 total

    所以这个该咋做啊 -

    11 条回复    2024-10-15 07:23:04 +08:00
    bleulucaswu
        1
    bleulucaswu  
    OP
       4 天前
    要做分页的原因是 redis search 这个是强制分页的 (ft.search index *)就你查询默认只给你 10 个
    所以必须 (ft.search index * 0 100) 就是必须给个 offset count 参数
    ZGame
        2
    ZGame  
       4 天前
    请求所有数据可以通过 merge ,zip 等操作符吧, 另外可以看看 hs-web ,用的就是响应式。响应式更多时候有点像函数式编程,看看你的操作能不能通过函数式的方式,关注入参出参,还有函数转换方法,尽量减少副作用和外部的变量直接传入,so.. 其实你会发现用不好比用命令式的方式写代码更难受
    OctVan
        3
    OctVan  
       4 天前
    https://projectreactor.io/docs/core/release/reference/#sinks
    可以用这个一直翻页,直到取完数据
    spkingr
        4
    spkingr  
       4 天前
    遇事不决问 AI ,这种问题问 AI 最好,他都能给你写好。
    编程式风格确实不推荐使用 for 循环,用 for 也是配合 yield 。
    你这个 flatMap 按理来说应该是可以的,我复制你的问题给 AI ,AI 就简单的给了一个参考代码,你可以看看,看能不能改改用上:

    public Mono<SearchResult> searchWithPagination(int pageNumber, int pageSize) {
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize)) // 返回是 Mono<SearchResult>,有条数和页数吧
    .flatMap(initialSearchResult -> { // flatMap 抽取结果
    int totalPages = initialSearchResult.getTotal() / pageSize; // 修改一下,获取页数
    if (pageNumber < totalPages) {
    // 这里继续查询并合并
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize))
    // map 则是转换结果
    .map(nextSearchResult -> combineSearchResults(initialSearchResult, nextSearchResult));
    } else {
    // 没有数据
    return Mono.just(initialSearchResult);
    }
    });
    }

    private SearchResult combineSearchResults(SearchResult initialResult, SearchResult nextResult) {
    List<Document> combinedDocuments = new ArrayList<>(initialResult.getDocuments());
    combinedDocuments.addAll(nextResult.getDocuments());
    SearchResult combinedResult = new SearchResult();
    combinedResult.setTotal(initialResult.getTotal());
    combinedResult.setDocuments(combinedDocuments);
    return combinedResult;
    }
    spkingr
        5
    spkingr  
       4 天前
    可以用 MD 吗:

    ```
    public Mono<SearchResult> searchWithPagination(int pageNumber, int pageSize) {
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize)) // 返回是 Mono<SearchResult>,有条数和页数吧
    .flatMap(initialSearchResult -> { // flatMap 抽取结果
    int totalPages = initialSearchResult.getTotal() / pageSize; // 修改一下,获取页数
    if (pageNumber < totalPages) {
    // 这里继续查询并合并
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize))
    // map 则是转换结果
    .map(nextSearchResult -> combineSearchResults(initialSearchResult, nextSearchResult));
    } else {
    // 没有数据
    return Mono.just(initialSearchResult);
    }
    });
    }

    private SearchResult combineSearchResults(SearchResult initialResult, SearchResult nextResult) {
    List<Document> combinedDocuments = new ArrayList<>(initialResult.getDocuments());
    combinedDocuments.addAll(nextResult.getDocuments());
    SearchResult combinedResult = new SearchResult();
    combinedResult.setTotal(initialResult.getTotal());
    combinedResult.setDocuments(combinedDocuments);
    return combinedResult;
    }

    ```
    yazinnnn0
        6
    yazinnnn0  
       3 天前
    这个效果?


    用的 mutiny, 跟 reactor 差不多
    拿到 total 的 mono 时做 flatmap 转换, 然后把你的循环查询转换称序列+步进(都用 fp 了就别用传统 for 循环了...
    然后把序列做 fold 操作, java 里貌似需要用 reduce 去实现, fold 的初始值设置为 mono 形式的零值, 比如 Mono.just(List.empty())之类, fold 的 operation 去合并 acc 和新查出来的值(mutiny 用 combine, reactor 里应该有类似的 api)
    bleulucaswu
        7
    bleulucaswu  
    OP
       3 天前
    @spkingr 这样是没错 可是如果 search 操作不止就两次呢 就不止向 redis 请求两次 我主要想请教下 reative 风格的编程中 循环 x 操作该怎么做 且 x 是异步执行在循环之外的一个线程中 然后 Xn 的部分参数和 Xn-1 有关 然后就是如果 Xn 触发了某个条件 如何让 Xn+1 Xn+2 ... 全部取消 X1~Xn 都是异步无前后顺序执行的 该怎么做到有序的取消操作
    bleulucaswu
        8
    bleulucaswu  
    OP
       3 天前
    @yazinnnn0 兄弟 图裂了
    spkingr
        9
    spkingr  
       2 天前
    @bleulucaswu 这个时候不得不提 CompletableFuture 大法好了!
    thenCombine/thenCompose/thenApplyAsync 这些方法很好用,建议参考这篇文章: https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
    bleulucaswu
        10
    bleulucaswu  
    OP
       2 小时 58 分钟前
    @yazinnnn0 请问下 如果 fold 里每次 search 操作都在一个额外的线程中进行,可以做到把结果 combine 到一起吗
    然后你这是 0..<20 step 5 来做循环的,假设 index=10 那次循环,因为触发了某个条件将它强制停止,有什么办法让 index=15 的那次循环以及之后所有的循环全部停止不做了(如果有的话),因为每次循环都是多线程乱序的,这能做到吗用 mutiny
    reactive 编程我还不熟悉 感觉用起来太难了
    bleulucaswu
        11
    bleulucaswu  
    OP
       2 小时 54 分钟前
    @yazinnnn0 就类似
    (1..<all step 5).fold(initial) { 多线程操作 }.until( index > 10 )
    就 index>10 的所有多线程里的操作都停掉,until 中肯定会有更复杂的判断条件,这里就举个例子
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5859 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 34ms · UTC 02:17 · PVG 10:17 · LAX 19:17 · JFK 22:17
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.