简化LangChain s OpenAI与Pyton Flask AP的回复
原标题:Stream a response from LangChain s OpenAI with Pyton Flask API

我正在使用Adal Flask 用于聊天数据。 因此,在ole里,我直接从公开审计局那里得到可流的响应,因为我能够用旗帜(<条码>流星>/True)进行播音。

问题在于,我可以把“过去”的溪流或“how” st倒在我的APIC呼吁中。


def askQuestion(self, collection_id, question):
        collection_name = "collection-" + str(collection_id)
        self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get( OPENAI_API_KEY ), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key= answer )
        chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)

        self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),

        result = self.chain({"question": question})
        res_dict = {
            "answer": result["answer"],

        res_dict["source_documents"] = []

        for source in result["source_documents"]:
                "page_content": source.page_content,
                "metadata":  source.metadata

        return res_dict`


@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
    question = request.form["question"]
    # response_generator = document_thread.askQuestion(collection_id, question)
    # return jsonify(response_generator)

    def stream(question):
        completion = document_thread.askQuestion(collection_id, question)
        for line in completion[ answer ]:
            yield line

    return app.response_class(stream_with_context(stream(question)))





使用<条码>翻新和<条码>查询 我们可以从微薄的APIC中作出分流反应。


class StreamingHandler(BaseCallbackHandler):

    def on_llm_new_token(self, token: str, **kwargs) -> None:

页: 1

from flask import Response, stream_with_context
import threading 

def stream_output():
   q = Queue()
   def generate(rq: Queue):
      # add your logic to prevent while loop
      # to run indefinitely  
      while( ...):
          yield rq.get()
   callback_fn = StreamingHandler(q)
   threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
   return Response(stream_with_context(generate(q))

在您的圆形链条ChatOpenAI中,添加上述习惯提示代码<代码>。 简化Handler。

self.llm = ChatOpenAI(
  openai_api_key=os.environ.get( OPENAI_API_KEY ), 




token_queue = Queue() #from queue import Queue

class LLMTokenQueueHandler(BaseCallbackHandler): 
    This is to change the behavior of LLMChain to 
    store the outputted tokens to a queue
    def on_llm_new_token(
        token: str, 
        ) -> None:    
        token_queue.put({"type": "token", "value": token})  

    def on_llm_end(
        response: LLMResult, 
        ) -> None:
        token_queue.put({ type :  end })

def generate_text_response(
    input_query: str
    ) -> None:
    Generate text response from LLM
    note that we are not streaming from this 
    function but from the stream_tokens() function
    prompt_template = """
    input your prompt template

    Chat History: 

    Human Input: 

    #adding the LLMTokenQueueHandler to the callback manager
    #so now the tokens are automatically stored into token_queue
    gptchat = ChatOpenAI(
        model_name= model_name , 
        temperature= 0.25, 
        openai_api_key=os.environ.get( OPENAI_API_KEY ), 
        streaming = True,

    prompt = PromptTemplate(
        #add more variables if needed

    llm_chain = LLMChain(
        verbose = False

    #this streaming call triggers the process to 
    #store answer tokens to queue
    for chunk_response in llm_chain.stream(
            "input_query": input_query, 

def stream_tokens():  
    """Generator function to stream tokens."""  
    while True:  
        # Wait for a token to be available in the queue and retrieve it  
        token_dict = token_queue.get()  
        print("token_dict: ", token_dict)

        if token_dict["type"] == "token":
            # encode str as byte  
            yield token_dict[ value ].encode( utf-8 )

        #we need to implement when streaming ends
        #with the  end  token, then break out of loop
        elif token_dict["type"] == "end":

@app.route( /stream , methods=[ POST ])  
def stream_text_response():  
    Stream text response with user input query
    input_json = request.get_json()  
    input_query = input_json.get( stream ,   )  
    # Start generate_text_response in a separate thread to avoid blocking  
    # Stream tokens back to the client as they are produced
    # not streaming generate_text_response as it doesn t produce
    # the streamed tokens directly  
    return Response(

