import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
public class LogFilter implements WebFilter, Ordered {
private NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final String sid = SessionHolder.generateSessionId();
final String url = WebHelper.getReqeustUri(request);
final String method = request.getMethodValue();
final String remoteAddr = WebHelper.getIpAddr(request);
final Map<String, Object> headers = new HashMap<>();
final long reqTime = System.currentTimeMillis();
// At 2019-11-28,fix the following issue{@code doOnError}
// use sid for trace exception.asynchronous call will cause some trace issue.consider it.
// SessionHolder.useSessionId(new ServerLogger.LogBase(remoteAddr, method, url, sid).toString());
// dump req
dump(remoteAddr, method, url, sid, headers.toString(), exchange);
// normal response
ServerHttpResponse originalResponse = exchange.getResponse();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// dump resp
String responseData = dumpBody(dataBuffers);
byte[] uppedContent = new String(responseData.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8).getBytes();
long costTime = System.currentTimeMillis() - reqTime;
HttpStatus httpStatus = getStatusCode();
Integer respCode = httpStatus == null ? -1 : httpStatus.value();
ServerLogger.logHttpResp(new ServerLogger.LogHttpResp(remoteAddr, method, url, sid, costTime, respCode, responseData == null ? "" : responseData, false));
return bufferFactory.wrap(uppedContent);
}));
}
return super.writeWith(body);
}
};
return chain.filter(exchange.mutate().request(request).response(decoratedResponse).build()).doOnError(
// dump error
error -> {
if (logWrite.pringLog(url, null))
ServerLogger.logHttpErr(new ServerLogger.LogHttpErr(remoteAddr, method, url, sid, error.getMessage()));
});
}
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
private String dumpParam(final ServerHttpRequest request) {
String reqParam = null;
MultiValueMap<String, String> multiValueMap = request.getQueryParams();
Map<String, String> paramsMap = multiValueMap.toSingleValueMap();
if (!CollectionUtils.isEmpty(paramsMap)) {
StringBuilder sb = new StringBuilder();
for (String key : paramsMap.keySet()) {
sb.append(key);
sb.append("=");
sb.append(paramsMap.get(key));
sb.append("&");
}
reqParam = sb.toString();
if (reqParam != null && reqParam.endsWith("&"))
reqParam = reqParam.substring(0, reqParam.lastIndexOf('&') - 1);
}
return reqParam;
}
private String dumpBody(List<? extends DataBuffer> dataBuffers) {
DataBuffer join = bufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
DataBufferUtils.release(join);
return new String(content, StandardCharsets.UTF_8);
/*
// Tip : the way cause messy code
List<String> list = Lists.newArrayList();
dataBuffers.forEach(dataBuffer -> {
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
DataBufferUtils.release(dataBuffer);
list.add(new String(content, StandardCharsets.UTF_8));
});
return joiner.join(list);
*/
}
private void dump(String remoteAddr, String method, String url, String sid, String headers, ServerWebExchange exchange) {
// dump param
final ServerLogger.LogHttpReq logHttpReq = new ServerLogger.LogHttpReq(remoteAddr, method, url, sid, headers);
ServerHttpRequest request = exchange.getRequest();
String reqParam = dumpParam(request);
logHttpReq.setReqParam(reqParam == null ? "" : reqParam);
switch (request.getMethod()) {
case GET:
ServerLogger.logHttpReq(logHttpReq);
break;
default:
// dump body
/**{@link CacheReqBodyFilter#filter(ServerWebExchange, WebFilterChain)}*/
Object cachedBody = exchange.getAttribute(SystemConstants.CACHED_REQ_BODY);
logHttpReq.setReqBody(cachedBody == null ? "" : cachedBody.toString());
ServerLogger.logHttpReq(logHttpReq);
// remove buf
exchange.getAttributes().remove(SystemConstants.CACHED_REQ_BODY);
}
}
}
网友评论