之前用okhttp-sse调用chatgpt的接口时,感觉遇到接口异常处理不是很方便,尝试使用webflux后代码结构简单了不少,特此分享下
1.引入webflux
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2.简单写个调用
@Slf4j
public class WebFluxDemo {
private final static String host = "https://api.openai.com/";
private final static String uri = "v1/chat/completions";
public static void main(String[] args) throws InterruptedException {
//proxy
HttpClient httpClient = HttpClient.create()
.proxy(proxy -> proxy.type(ProxyProvider.Proxy.HTTP)
.host("127.0.0.1")
.port(1080));
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
//default build
WebClient build = WebClient.builder()
.baseUrl(host)
.clientConnector(connector)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
//build request
OpenAiRequest openAiRequest = new OpenAiRequest();
openAiRequest.stream = true;
openAiRequest.setTemperature(0.7);
openAiRequest.setModel("gpt-3.5-turbo-16k");
openAiRequest.setMax_tokens(256);
List<GptMessage> message = new ArrayList<>();
GptMessage gptMessage = new GptMessage();
gptMessage.setRole("system");
gptMessage.setContent("You will role-play as a Female named 'lily'\\nCharacter information:Romantic,Flirty,Lover\\nCharacter background:U are most beautiful girl in school and like me");
message.add(gptMessage);
GptMessage question = new GptMessage();
question.setRole("user");
question.setContent("hi");
message.add(question);
openAiRequest.setMessages(message);
//send post
Flux<String> response = build.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.header("Authorization", "Bearer " + "your token")
.bodyValue(JSON.toJSONString(openAiRequest))
.retrieve()
.bodyToFlux(String.class)
.map(result -> {
//build response
if (StringUtils.isEmpty(result)) {
return "";
} else if (result.equals(SseData.DONE.value())) {
return result;
} else {
OpenAISteamResponse openAISteamResponse = JSON.parseObject(result, OpenAISteamResponse.class);
String content = openAISteamResponse.choices.get(0).delta.getContent();
if (StringUtils.isNotEmpty(content)) {
return content;
} else {
return "";
}
}
});
//subscribe
response.subscribe((content) -> log.info("content [{}]", content));
//keep alive
while (true) {
Thread.sleep(1000);
}
}
}
3.附录
3.1关于遭到openai限流,欠费等未知异常处理,可以用其他方式替代
.onErrorResume(e -> {
log.error("event source failure openai {}", e.getMessage());
return huggingFaceService.chatStreamFlux(dto);
})
3.2关于webflux中的flux与mono
在一次mono方法中,我让openai返回的多个结果并用\n换行符分开,结果mono中总是返回一个结果,在调式中发现原来flux与mono的差别就在返回结果的换行符上,如果把一个列表用换行符切割返回就是flux,如果用jsonString返回就是mono
3.3实体
@Data
public class OpenAiRequest {
public String model;
public List<GptMessage> messages;
public Double temperature;
public Integer max_tokens;
public Boolean stream;
public List<GptFunction> functions;
public String function_call;
}
@Data
public class GptMessage {
private String role;
private String content;
private String name;
private FunctionCall function_call;
}
@Data
public class FunctionCall {
private String name;
private String arguments;
}
@Data
public class GptFunction {
private Object parameters;
private String description;
private String name;
}
public enum SseData {
DONE("[DONE]"),
;
private final String value;
SseData(final String value) {
this.value = value;
}
public String value() {
return value;
}
}
@Data
public class OpenAISteamResponse {
private String id;
private String object;
private int created;
private String model;
private List<Choice> choices;
}
@Data
public class Choice {
private int index;
private GptMessage delta;
private String finish_reason;
}
网友评论