private var streamingRequestCalls = ArrayList<Call>()
fun requestStreamRequest(streamRequest: StreamRequest, streamingTrack: StreamingTrack, bufferCallback:(StreamingTrack)->Unit) {
Loger.d(Loger.TAG_CHAT_GPT, "request-start: $streamingTrack")
//记录stream缓冲数据
val streamingTrackId = System.currentTimeMillis()
val streamingTrackCalls = ConcurrentLinkedDeque<StreamingBuffer>()
streamingTrackCalls.add(StreamingBuffer(streamingTrackId, streamingTrack, StreamingTrack.STATE_REQUEST))
//记录stream原始数据
val streamType = streamingTrack.type
val streamTypeDelay = StreamingTrack.TYPE_DELAY_TIMES_CONFIG[streamType]?:0
//检查stream数据(处理主线程会被阻塞情况)
viewModelScope.launch(Dispatchers.Main) {
responseStreamCallsSync(streamTypeDelay, streamingTrackCalls, bufferCallback)
}
//用于结束时清理已使用的接口
val call = ChatSteam.request(streamRequest, streamingTrack,
printing = { index, buffer, trackOrigin ->
streamingTrackCalls.add(StreamingBuffer(streamingTrackId, trackOrigin, StreamingTrack.STATE_BUFFERING, index, buffer))
},
finish = { trackOrigin ->
streamingTrackCalls.add(StreamingBuffer(streamingTrackId, trackOrigin, StreamingTrack.STATE_RESPOND))
})
streamingRequestCalls.add(call)
}
private suspend fun responseStreamCallsSync(streamTypeDelay:Long, streamingTrackCalls:ConcurrentLinkedDeque<StreamingBuffer>, bufferCallback: (StreamingTrack) -> Unit) {
val iterator = streamingTrackCalls.iterator()
if (iterator.hasNext()) {
// stream保存的原始数据
val streamingBuffer = iterator.next()
val trackId = streamingBuffer.trackId
val bufferingState = streamingBuffer.bufferingState
val bufferingIndex = streamingBuffer.bufferingIndex
val bufferingText = streamingBuffer.bufferingText
//回调
bufferCallback(streamingBuffer.streamingTrack.withHttpState(streamingBuffer.bufferingState).withBuffer(bufferingIndex, bufferingText))
//清空任务
iterator.remove()
when (bufferingState) {
StreamingTrack.STATE_REQUEST -> {
delay(streamTypeDelay)
responseStreamCallsSync(streamTypeDelay, streamingTrackCalls, bufferCallback)
}
StreamingTrack.STATE_BUFFERING -> {
delay(streamTypeDelay)
responseStreamCallsSync(streamTypeDelay, streamingTrackCalls, bufferCallback)
}
StreamingTrack.STATE_RESPOND -> {
}
}
}
// 发生空间隔: 比如start后间隔2秒才有流式响应, 流式响应延迟等, 空循环(自旋等待)
else {
delay(StreamingTrack.TYPE_DELAY_TIMES_WAIT)
responseStreamCallsSync(streamTypeDelay, streamingTrackCalls, bufferCallback)
}
}