内容目录
有的时候需要做数据聚合接口,需要批量请求一些URL,聚合完成后再生成响应。有什么好的实现呢?
JavaScript实现
这里首先想到的就是Node的异步请求,结合Promise
,实现起来会非常方便,性能也不错。
// npm install --save request
// npm install --save request-promise
const rp = require('request-promise');
// 京东商品列表页
const items = ['https://item.jd.com/6784496.html', 'https://item.jd.com/5089253.html', 'https://item.jd.com/5089273.html', 'https://item.jd.com/100000177760.html', 'https://item.jd.com/100000287113.html', 'https://item.jd.com/1861101.html', 'https://item.jd.com/3133847.html', 'https://item.jd.com/100000177748.html', 'https://item.jd.com/11794447957.html', 'https://item.jd.com/31545088844.html', 'https://item.jd.com/16580586466.html', 'https://item.jd.com/27424489997.html', 'https://item.jd.com/11464031106.html', 'https://item.jd.com/34803424704.html', 'https://item.jd.com/10889864876.html', 'https://item.jd.com/16580070299.html', 'https://item.jd.com/31544565956.html', 'https://item.jd.com/32943442167.html', 'https://item.jd.com/16580068432.html', 'https://item.jd.com/32242288360.html', 'https://item.jd.com/11357751613.html', 'https://item.jd.com/32943757456.html', 'https://item.jd.com/32943574369.html', 'https://item.jd.com/32858702835.html', 'https://item.jd.com/25836502018.html'];
// 京东商品价格JSON接口
const urls = items.map((item) => {
const id = item.substring(item.lastIndexOf('/') + 1, item.length - 5);
return `https://p.3.cn/prices/mgets?skuIds=J_${id}&type=1`;
});
console.time('time is');
// 批量发送http请求获得价格
Promise.all(urls.map(url => rp(url))).then((values) => {
// 价格相加
let sum = 0;
values.map((json) => {
sum += Number.parseFloat(JSON.parse(json)[0].p);
return sum;
});
console.timeEnd('time is');
console.log('sum is', sum);
}).catch((reason) => {
console.error(reason);
});
25个网络请求,只需要200多毫秒,总体来说性能还是不错的。
node .\batch_fetch_test.js
time is: 213.554ms
sum is 130456
Java线程池实现
当然了对于Javaer,肯定也有很多解决办法的呀。
首先想到的是线程池对吧:
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.web.client.RestTemplate;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BatchFetchTest {
private static final RestTemplate restTemplate = new RestTemplate();
public static void main(String[] args) {
String[] items = {
"https://item.jd.com/6784496.html", "https://item.jd.com/5089253.html", "https://item.jd.com/5089273.html", "https://item.jd.com/100000177760.html", "https://item.jd.com/100000287113.html", "https://item.jd.com/1861101.html", "https://item.jd.com/3133847.html", "https://item.jd.com/100000177748.html", "https://item.jd.com/11794447957.html", "https://item.jd.com/31545088844.html", "https://item.jd.com/16580586466.html", "https://item.jd.com/27424489997.html", "https://item.jd.com/11464031106.html", "https://item.jd.com/34803424704.html", "https://item.jd.com/10889864876.html", "https://item.jd.com/16580070299.html", "https://item.jd.com/31544565956.html", "https://item.jd.com/32943442167.html", "https://item.jd.com/16580068432.html", "https://item.jd.com/32242288360.html", "https://item.jd.com/11357751613.html", "https://item.jd.com/32943757456.html", "https://item.jd.com/32943574369.html", "https://item.jd.com/32858702835.html", "https://item.jd.com/25836502018.html"
};
final int size = items.length;
List<String> urls = Arrays.stream(items).map(item -> String.format("https://p.3.cn/prices/mgets?skuIds=J_%s&type=1", item.substring(item.lastIndexOf('/') + 1, item.length() - 5)))
.collect(Collectors.toList());
// 先初始化好线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(size), new ThreadPoolExecutor.AbortPolicy());
// 提前启动好所有的线程
threadPool.prestartAllCoreThreads();
// 华丽的分割线
final String separatorLine = String.join("", Collections.nCopies(10, "="));
// 咱们这里请求三次,做好预热,以最后一次结果为准
IntStream.rangeClosed(1, 3).forEach((x) -> {
try {
fetch(urls, threadPool);
System.out.println(separatorLine);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
threadPool.shutdownNow();
}
private static void fetch(List<String> urls, ThreadPoolExecutor threadPool) throws InterruptedException {
// 使用CountDownLatch并发请求
CountDownLatch latch = new CountDownLatch(urls.size());
// 发送请求:
long start = System.nanoTime();
List<Future<List<Value>>> futureList = urls.stream().map(url -> threadPool.submit(new FetchTask(url, latch))).collect(Collectors.toList());
latch.await();
double sum = futureList.stream().mapToDouble(value -> {
try {
return value.get().get(0).getP();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).sum();
System.out.println("time is " + (System.nanoTime() - start) / (1000.00D * 1000.00D) + "ms");
System.out.println("sum is " + sum);
}
private static class FetchTask implements Callable<List<Value>> {
private String url;
private CountDownLatch latch;
FetchTask(String url, CountDownLatch latch) {
this.url = url;
this.latch = latch;
}
@Override
public List<Value> call() {
try {
return restTemplate.exchange(url, HttpMethod.GET, null
, new ParameterizedTypeReference<List<Value>>() {
}
).getBody();
} finally {
latch.countDown();
}
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
static class Value {
private String id;
private Double p;
}
}
实测性能更好,哈哈。
time is 133.8036ms
sum is 130456.0
CompletableFuture实现
然而代码太长了对吧,需要实现一个Callable
接口,太麻烦,如果你已经用上了Java8,那么恭喜你,有一个叫做CompletableFuture
的类,可以更好的实现这类需求。
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Before;
import org.junit.Test;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class CompletableFutureTest {
private final String[] items = {
"https://item.jd.com/6784496.html", "https://item.jd.com/5089253.html", "https://item.jd.com/5089273.html", "https://item.jd.com/100000177760.html", "https://item.jd.com/100000287113.html", "https://item.jd.com/1861101.html", "https://item.jd.com/3133847.html", "https://item.jd.com/100000177748.html", "https://item.jd.com/11794447957.html", "https://item.jd.com/31545088844.html", "https://item.jd.com/16580586466.html", "https://item.jd.com/27424489997.html", "https://item.jd.com/11464031106.html", "https://item.jd.com/34803424704.html", "https://item.jd.com/10889864876.html", "https://item.jd.com/16580070299.html", "https://item.jd.com/31544565956.html", "https://item.jd.com/32943442167.html", "https://item.jd.com/16580068432.html", "https://item.jd.com/32242288360.html", "https://item.jd.com/11357751613.html", "https://item.jd.com/32943757456.html", "https://item.jd.com/32943574369.html", "https://item.jd.com/32858702835.html", "https://item.jd.com/25836502018.html"
};
private RestTemplate restTemplate;
private Executor executor;
@Before
public void init() {
restTemplate = new RestTemplate();
executor = Executors.newFixedThreadPool(Math.min(items.length, 100), r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
}
@Test
public void fetch() {
IntStream.rangeClosed(1, 5).forEach(x -> {
long start = System.nanoTime();
double sum = Arrays.asList(items).parallelStream()
.map(item -> String.format("https://p.3.cn/prices/mgets?skuIds=J_%s&type=1", item.substring(item.lastIndexOf('/') + 1, item.length() - 5)))
.map(url -> CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url, String.class), executor))
.map(CompletableFuture::join)
.map(json -> parseJson(json))
.mapToDouble(m -> m.get(0).get("p").asDouble())
.sum();
System.out.println("time is " + (System.nanoTime() - start) / (1000.00D * 1000.00D) + "ms");
System.out.println("sum is " + sum);
});
}
private List<ObjectNode> parseJson(String json) {
try {
return new ObjectMapper().readValue(json, new TypeReference<List<ObjectNode>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
我们看一下结果:
time is 182.9303ms
sum is 130456.0
WebFlux实现
最后,还有别的办法吗?Spring5出了一个叫做WebFlux的非阻塞式Web框架,可以很好的支持类似Node的异步请求。
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Before;
import org.junit.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author smilezhou
* @date 2019/5/24
*/
public class WebFluxTest {
private WebClient webClient;
private final String[] items = {
"https://item.jd.com/6784496.html", "https://item.jd.com/5089253.html", "https://item.jd.com/5089273.html", "https://item.jd.com/100000177760.html", "https://item.jd.com/100000287113.html", "https://item.jd.com/1861101.html", "https://item.jd.com/3133847.html", "https://item.jd.com/100000177748.html", "https://item.jd.com/11794447957.html", "https://item.jd.com/31545088844.html", "https://item.jd.com/16580586466.html", "https://item.jd.com/27424489997.html", "https://item.jd.com/11464031106.html", "https://item.jd.com/34803424704.html", "https://item.jd.com/10889864876.html", "https://item.jd.com/16580070299.html", "https://item.jd.com/31544565956.html", "https://item.jd.com/32943442167.html", "https://item.jd.com/16580068432.html", "https://item.jd.com/32242288360.html", "https://item.jd.com/11357751613.html", "https://item.jd.com/32943757456.html", "https://item.jd.com/32943574369.html", "https://item.jd.com/32858702835.html", "https://item.jd.com/25836502018.html"
};
@Before
public void setUp() {
webClient = WebClient.create("https://p.3.cn/prices");
}
@Test
public void fetch() throws InterruptedException {
final int count = 3;
CountDownLatch latch = new CountDownLatch(count);
doFetch(latch, count);
latch.await();
}
private void doFetch(final CountDownLatch latch, final int count) {
long start = System.nanoTime();
Flux.fromArray(items)
.map(item -> item.substring(item.lastIndexOf('/') + 1, item.length() - 5))
.flatMap(id -> webClient.get().uri("/mgets?skuIds=J_{id}&type=1", id).retrieve().bodyToMono(String.class).map(this::parseJson).map(m -> m.get(0).get("p").asDouble()))
.reduce(0D, (a, b) -> a + b)
.subscribe(sum -> {
System.out.println("time is " + (System.nanoTime() - start) / (1000.00D * 1000.00D) + "ms");
System.out.println("sum is " + sum);
latch.countDown();
if (count - 1 > 0) {
doFetch(latch, count - 1);
}
});
}
private List<ObjectNode> parseJson(String json) {
try {
return new ObjectMapper().readValue(json, new TypeReference<List<ObjectNode>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
time is 69.352ms
sum is 130456.0
可以看到,WebFlux的结果是最快的。
当然,Webflux目前并不成熟,特别是不支持JDBC的异步访问,在做项目的时候需要权衡一下,小型的项目是可以胜任的。期待WebFlux更好的明天~~~
0 条评论
撰写评论