您的浏览器过于古老 & 陈旧。为了更好的访问体验, 请 升级你的浏览器
smilezhou666 发布于2019年05月24日 15:51 最近更新于 2019年05月24日 18:13

原创 批量请求URL做数据聚合的各种实现

885 次浏览 读完需要≈ 31 分钟 JavaScriptJava

内容目录

有的时候需要做数据聚合接口,需要批量请求一些URL,聚合完成后再生成响应。有什么好的实现呢?

JavaScript实现

这里首先想到的就是Node的异步请求,结合Promise,实现起来会非常方便,性能也不错。

// npm install --save request
// npm install --save request-promise
const rp = require('request-promise');
// 京东商品列表页
const items = ['https://item.jd.com/6784496.html', 'https://item.jd.com/5089253.html', 'https://item.jd.com/5089273.html', 'https://item.jd.com/100000177760.html', 'https://item.jd.com/100000287113.html', 'https://item.jd.com/1861101.html', 'https://item.jd.com/3133847.html', 'https://item.jd.com/100000177748.html', 'https://item.jd.com/11794447957.html', 'https://item.jd.com/31545088844.html', 'https://item.jd.com/16580586466.html', 'https://item.jd.com/27424489997.html', 'https://item.jd.com/11464031106.html', 'https://item.jd.com/34803424704.html', 'https://item.jd.com/10889864876.html', 'https://item.jd.com/16580070299.html', 'https://item.jd.com/31544565956.html', 'https://item.jd.com/32943442167.html', 'https://item.jd.com/16580068432.html', 'https://item.jd.com/32242288360.html', 'https://item.jd.com/11357751613.html', 'https://item.jd.com/32943757456.html', 'https://item.jd.com/32943574369.html', 'https://item.jd.com/32858702835.html', 'https://item.jd.com/25836502018.html'];
// 京东商品价格JSON接口
const urls = items.map((item) => {
  const id = item.substring(item.lastIndexOf('/') + 1, item.length - 5);
  return `https://p.3.cn/prices/mgets?skuIds=J_${id}&type=1`;
});
console.time('time is');
// 批量发送http请求获得价格
Promise.all(urls.map(url => rp(url))).then((values) => {
  // 价格相加
  let sum = 0;
  values.map((json) => {
    sum += Number.parseFloat(JSON.parse(json)[0].p);
    return sum;
  });
  console.timeEnd('time is');
  console.log('sum is', sum);
}).catch((reason) => {
  console.error(reason);
});

25个网络请求,只需要200多毫秒,总体来说性能还是不错的。

node .\batch_fetch_test.js
time is: 213.554ms
sum is 130456

Java线程池实现

当然了对于Javaer,肯定也有很多解决办法的呀。

首先想到的是线程池对吧:

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.web.client.RestTemplate;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class BatchFetchTest {
    private static final RestTemplate restTemplate = new RestTemplate();

    public static void main(String[] args) {
        String[] items = {
                "https://item.jd.com/6784496.html", "https://item.jd.com/5089253.html", "https://item.jd.com/5089273.html", "https://item.jd.com/100000177760.html", "https://item.jd.com/100000287113.html", "https://item.jd.com/1861101.html", "https://item.jd.com/3133847.html", "https://item.jd.com/100000177748.html", "https://item.jd.com/11794447957.html", "https://item.jd.com/31545088844.html", "https://item.jd.com/16580586466.html", "https://item.jd.com/27424489997.html", "https://item.jd.com/11464031106.html", "https://item.jd.com/34803424704.html", "https://item.jd.com/10889864876.html", "https://item.jd.com/16580070299.html", "https://item.jd.com/31544565956.html", "https://item.jd.com/32943442167.html", "https://item.jd.com/16580068432.html", "https://item.jd.com/32242288360.html", "https://item.jd.com/11357751613.html", "https://item.jd.com/32943757456.html", "https://item.jd.com/32943574369.html", "https://item.jd.com/32858702835.html", "https://item.jd.com/25836502018.html"
        };
        final int size = items.length;
        List<String> urls = Arrays.stream(items).map(item -> String.format("https://p.3.cn/prices/mgets?skuIds=J_%s&type=1", item.substring(item.lastIndexOf('/') + 1, item.length() - 5)))
                .collect(Collectors.toList());

        // 先初始化好线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(size), new ThreadPoolExecutor.AbortPolicy());
        // 提前启动好所有的线程
        threadPool.prestartAllCoreThreads();
        // 华丽的分割线
        final String separatorLine = String.join("", Collections.nCopies(10, "="));

        // 咱们这里请求三次,做好预热,以最后一次结果为准
        IntStream.rangeClosed(1, 3).forEach((x) -> {
            try {
                fetch(urls, threadPool);
                System.out.println(separatorLine);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        threadPool.shutdownNow();
    }

    private static void fetch(List<String> urls, ThreadPoolExecutor threadPool) throws InterruptedException {
        // 使用CountDownLatch并发请求
        CountDownLatch latch = new CountDownLatch(urls.size());
        // 发送请求:
        long start = System.nanoTime();
        List<Future<List<Value>>> futureList = urls.stream().map(url -> threadPool.submit(new FetchTask(url, latch))).collect(Collectors.toList());
        latch.await();
        double sum = futureList.stream().mapToDouble(value -> {
            try {
                return value.get().get(0).getP();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).sum();
        System.out.println("time is " + (System.nanoTime() - start) / (1000.00D * 1000.00D) + "ms");
        System.out.println("sum is " + sum);
    }


    private static class FetchTask implements Callable<List<Value>> {
        private String url;
        private CountDownLatch latch;

        FetchTask(String url, CountDownLatch latch) {
            this.url = url;
            this.latch = latch;
        }

        @Override
        public List<Value> call() {
            try {
                return restTemplate.exchange(url, HttpMethod.GET, null
                        , new ParameterizedTypeReference<List<Value>>() {
                        }
                ).getBody();
            } finally {
                latch.countDown();
            }
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    static class Value {
        private String id;
        private Double p;
    }
}

实测性能更好,哈哈。

time is 133.8036ms
sum is 130456.0

CompletableFuture实现

然而代码太长了对吧,需要实现一个Callable接口,太麻烦,如果你已经用上了Java8,那么恭喜你,有一个叫做CompletableFuture的类,可以更好的实现这类需求。

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Before;
import org.junit.Test;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;


public class CompletableFutureTest {
    private final String[] items = {
            "https://item.jd.com/6784496.html", "https://item.jd.com/5089253.html", "https://item.jd.com/5089273.html", "https://item.jd.com/100000177760.html", "https://item.jd.com/100000287113.html", "https://item.jd.com/1861101.html", "https://item.jd.com/3133847.html", "https://item.jd.com/100000177748.html", "https://item.jd.com/11794447957.html", "https://item.jd.com/31545088844.html", "https://item.jd.com/16580586466.html", "https://item.jd.com/27424489997.html", "https://item.jd.com/11464031106.html", "https://item.jd.com/34803424704.html", "https://item.jd.com/10889864876.html", "https://item.jd.com/16580070299.html", "https://item.jd.com/31544565956.html", "https://item.jd.com/32943442167.html", "https://item.jd.com/16580068432.html", "https://item.jd.com/32242288360.html", "https://item.jd.com/11357751613.html", "https://item.jd.com/32943757456.html", "https://item.jd.com/32943574369.html", "https://item.jd.com/32858702835.html", "https://item.jd.com/25836502018.html"
    };
    private RestTemplate restTemplate;
    private Executor executor;

    @Before
    public void init() {
        restTemplate = new RestTemplate();
        executor = Executors.newFixedThreadPool(Math.min(items.length, 100), r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        });
    }

    @Test
    public void fetch() {
        IntStream.rangeClosed(1, 5).forEach(x -> {
            long start = System.nanoTime();
            double sum = Arrays.asList(items).parallelStream()
                    .map(item -> String.format("https://p.3.cn/prices/mgets?skuIds=J_%s&type=1", item.substring(item.lastIndexOf('/') + 1, item.length() - 5)))
                    .map(url -> CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url, String.class), executor))
                    .map(CompletableFuture::join)
                    .map(json -> parseJson(json))
                    .mapToDouble(m -> m.get(0).get("p").asDouble())
                    .sum();
            System.out.println("time is " + (System.nanoTime() - start) / (1000.00D * 1000.00D) + "ms");
            System.out.println("sum is " + sum);
        });
    }

    private List<ObjectNode> parseJson(String json) {
        try {
            return new ObjectMapper().readValue(json, new TypeReference<List<ObjectNode>>() {
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

我们看一下结果:

time is 182.9303ms
sum is 130456.0

WebFlux实现

最后,还有别的办法吗?Spring5出了一个叫做WebFlux的非阻塞式Web框架,可以很好的支持类似Node的异步请求。

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Before;
import org.junit.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author smilezhou
 * @date 2019/5/24
 */
public class WebFluxTest {
    private WebClient webClient;
    private final String[] items = {
            "https://item.jd.com/6784496.html", "https://item.jd.com/5089253.html", "https://item.jd.com/5089273.html", "https://item.jd.com/100000177760.html", "https://item.jd.com/100000287113.html", "https://item.jd.com/1861101.html", "https://item.jd.com/3133847.html", "https://item.jd.com/100000177748.html", "https://item.jd.com/11794447957.html", "https://item.jd.com/31545088844.html", "https://item.jd.com/16580586466.html", "https://item.jd.com/27424489997.html", "https://item.jd.com/11464031106.html", "https://item.jd.com/34803424704.html", "https://item.jd.com/10889864876.html", "https://item.jd.com/16580070299.html", "https://item.jd.com/31544565956.html", "https://item.jd.com/32943442167.html", "https://item.jd.com/16580068432.html", "https://item.jd.com/32242288360.html", "https://item.jd.com/11357751613.html", "https://item.jd.com/32943757456.html", "https://item.jd.com/32943574369.html", "https://item.jd.com/32858702835.html", "https://item.jd.com/25836502018.html"
    };

    @Before
    public void setUp() {
        webClient = WebClient.create("https://p.3.cn/prices");
    }

    @Test
    public void fetch() throws InterruptedException {
        final int count = 3;
        CountDownLatch latch = new CountDownLatch(count);
        doFetch(latch, count);
        latch.await();
    }

    private void doFetch(final CountDownLatch latch, final int count) {
        long start = System.nanoTime();
        Flux.fromArray(items)
                .map(item -> item.substring(item.lastIndexOf('/') + 1, item.length() - 5))
                .flatMap(id -> webClient.get().uri("/mgets?skuIds=J_{id}&type=1", id).retrieve().bodyToMono(String.class).map(this::parseJson).map(m -> m.get(0).get("p").asDouble()))
                .reduce(0D, (a, b) -> a + b)
                .subscribe(sum -> {
                    System.out.println("time is " + (System.nanoTime() - start) / (1000.00D * 1000.00D) + "ms");
                    System.out.println("sum is " + sum);
                    latch.countDown();
                    if (count - 1 > 0) {
                        doFetch(latch, count - 1);
                    }
                });
    }

    private List<ObjectNode> parseJson(String json) {
        try {
            return new ObjectMapper().readValue(json, new TypeReference<List<ObjectNode>>() {
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
time is 69.352ms
sum is 130456.0

可以看到,WebFlux的结果是最快的。

当然,Webflux目前并不成熟,特别是不支持JDBC的异步访问,在做项目的时候需要权衡一下,小型的项目是可以胜任的。期待WebFlux更好的明天~~~

  • CodePlayer技术交流群1
  • CodePlayer技术交流群2

0 条评论

撰写评论