美文网首页
langchain项目分析

langchain项目分析

作者: 以梦为马驾驾驾 | 来源:发表于2023-03-16 12:56 被阅读0次

    首先可以学习一些使用langchain的项目:

    用到的python库:
    pydantic: 数据验证: 参考: https://www.cnblogs.com/fengqiang626/p/13307771.html
    typing: 类型约束
    manifest-ml: 多个大模型(不包含所有)的统一调用客户端
    promptlayer: 记录对openapi的调用, 搜索历史, 性能追踪.
    unstructured: 若用户在使用 0.4.9 以下版本, 读取到的信息没有metadata字段, 而如果大于等于0.4.9则有
    tenacity: 重试

    用到的python特性:
    dataclass: 合于存储数据对象(data object)的Python类, 参考 https://zhuanlan.zhihu.com/p/59657729

    image.png

    LLM 模型层

    image.png

    BaseLanguageModel : 抽象基类, 和各个模型交互的通用行为:基于用户的输入生成prompt
    BaseLLM: 通用的基础大模型基类, 增加了缓存选项, 回调选项, 有部分序列化能力, 持有各种参数.

    LLM : 和大模型的交互抽象, 所有子类都有自己的交互实现. 对它的调用, 将直接获取完全的prompt, 配合大模型特有的参数, 如temperture, length, top_p等等, 组装后, 利用一个client(组装的逻辑,既可以让专有client,即sdk吃掉,也可以组装后, 给到httpclient后者本地进程)发送给背后的大模型.

    BaseOpenAI 为了OpenAI设计的专有类, 因为OpenAI的模型有不同的调用平台, 如部署在azure的, 官网自己的. NOTICE: 如果模型是"gpt-3.5-turbo" 则, 直接返回OpenAIChat. 否则说明是其他模型
    贴一段基类的模板代码, 分析:

    1. 获取调用参数(组装openAI模型调用的请求体,请求参数)
    2. 获取子prompts
    3. 监测token的消耗
    4. 对每个prompt开启调用(重试机制)
    5. 生成统一的结果
        def _generate(
            self, prompts: List[str], stop: Optional[List[str]] = None
        ) -> LLMResult:
            """Call out to OpenAI's endpoint with k unique prompts.
    
            Args:
                prompts: The prompts to pass into the model.
                stop: Optional list of stop words to use when generating.
    
            Returns:
                The full LLM output.
    
            Example:
                .. code-block:: python
    
                    response = openai.generate(["Tell me a joke."])
            """
            # TODO: write a unit test for this
            params = self._invocation_params
            sub_prompts = self.get_sub_prompts(params, prompts, stop)
            choices = []
            token_usage: Dict[str, int] = {}
            # Get the token usage from the response.
            # Includes prompt, completion, and total tokens used.
            _keys = {"completion_tokens", "prompt_tokens", "total_tokens"}
            for _prompts in sub_prompts:
                if self.streaming:
                    if len(_prompts) > 1:
                        raise ValueError("Cannot stream results with multiple prompts.")
                    params["stream"] = True
                    response = _streaming_response_template()
                    for stream_resp in completion_with_retry(
                        self, prompt=_prompts, **params
                    ):
                        self.callback_manager.on_llm_new_token(
                            stream_resp["choices"][0]["text"],
                            verbose=self.verbose,
                            logprobs=stream_resp["choices"][0]["logprobs"],
                        )
                        _update_response(response, stream_resp)
                    choices.extend(response["choices"])
                else:
                    response = completion_with_retry(self, prompt=_prompts, **params)
                    choices.extend(response["choices"])
                if not self.streaming:
                    # Can't update token usage if streaming
                    update_token_usage(_keys, response, token_usage)
            return self.create_llm_result(choices, prompts, token_usage)
    
    

    PromptLayerOpenAI... : 只是增加了promptlayer

    OpenAIChat : 专和OpenAI的3.5模型交互的类
    贴一段参数准备的代码:

    1. 校验参数的个数, 只有一个prompt
    2. 预设参数, user, prefix, max_tokens
        def _get_chat_params(
            self, prompts: List[str], stop: Optional[List[str]] = None
        ) -> Tuple:
            if len(prompts) > 1:
                raise ValueError(
                    f"OpenAIChat currently only supports single prompt, got {prompts}"
                )
            messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}]
            params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params}
            if stop is not None:
                if "stop" in params:
                    raise ValueError("`stop` found in both the input and default params.")
                params["stop"] = stop
            if params.get("max_tokens") == -1:
                # for ChatGPT api, omitting max_tokens is equivalent to having no limit
                del params["max_tokens"]
            return messages, params
    

    Chains 模块

    Chains是组合其他各层的胶水, 亦是用户程序的调用入口.
    Chain: 基类, 是所有chain对象的基本入口. 与用户程序交互, 处理用户的输入, 准备其他模块的输入, 提供内存能力, chain的回调能力. 其他所有的 Chain 类都继承自这个基类,并根据需要实现特定的功能。chain通过传入string值, 来控制接受的输入和给出的输出. 如input_key为["abc", "def"], 那么它只会处理用户输入的dict里面的这两个参数, 如果output_key为["uvw"], 那么它在输出的时候会过滤掉其他的dict值.

    继承Chain的子类主要有两种类型:

    1. 通用 generic chain: 不在乎chain的具体类型, 控制chain的调用顺序, 是否调用. 他们可以用来合并构造其他的chain.
    2. 具体chain: 和通用chain比较来说, 他们承担了具体的某项任务, 可以和通用的chain组合起来使用, 也可以直接使用.
      LLMChain : 针对语言模型LLM的查询, 可以格式化prompt以及调用语言模型.
      其他 Chain 类 :除了 LLMChain,还有其他继承自 Chain 的类,它们根据不同的需求实现了特定的功能。例如,有些 Chain 类可能用于处理文本数据,有些可能用于处理图像数据,有些可能用于处理音频数据等。这些类都继承自 Chain 基类,并根据需要实现特定的输入和输出处理方法。

    具体的chain就像是大模型在某种业务场景下的应用模式总结, 如VectorDBQA 就是利用 vector存储和大模型, 在vectorstore中用某种相似算法找到和问题类似的doc, 之后利用大模型将doc和问题一起给到到模型, 让大模型解释给出结果. 杜宇这类合并文档的任务: BaseCombineDocumentsChain 有四种不同的模式.

    image.png image.png

    generic chain

    loading from hub

    从hub里获取某个配置好的chain, 实际的chain类型不会超过库里已经定义的. 举例:
    https://langchain.readthedocs.io/en/latest/modules/chains/generic/from_hub.html

    从服务端拉下来一个VectorDBQA类型的chain, 本地根据拉下来的配置初始化chain.

    LLM Chain

    对大模型最直接的调用chain, 常常被用来组合成其他的chain.

    如 APIChain中, SequentialChain中

    template = """Write a {adjective} poem about {subject}."""
    prompt = PromptTemplate(template=template, input_variables=["adjective", "subject"])
    llm_chain = LLMChain(prompt=prompt, llm=OpenAI(temperature=0), verbose=True)
    
    llm_chain.predict(adjective="sad", subject="ducks")
    

    Sequentical Chain

    顺序执行chains

    • SimpleSequentialChain: 前者的输出就是后者的输入
    • SequentialChain: 允许多个输入和输出

    Serialization

    所有的chain都可以持久化, 以及从持久化中恢复. 具体在
    langchain.chains.load_chain 方法中, load_chain_from_file 从file(虚拟file,可能是网络) 中读取到配置后, load_chain_from_config从配置中获取所有的chain的配置信息,

    针对每一种chain都有对应的load方法

    type_to_loader_dict = {
        "api_chain": _load_api_chain,
        "hyde_chain": _load_hyde_chain,
        "llm_chain": _load_llm_chain,
        "llm_bash_chain": _load_llm_bash_chain,
        "llm_checker_chain": _load_llm_checker_chain,
        "llm_math_chain": _load_llm_math_chain,
        "llm_requests_chain": _load_llm_requests_chain,
        "pal_chain": _load_pal_chain,
        "qa_with_sources_chain": _load_qa_with_sources_chain,
        "stuff_documents_chain": _load_stuff_documents_chain,
        "map_reduce_documents_chain": _load_map_reduce_documents_chain,
        "map_rerank_documents_chain": _load_map_rerank_documents_chain,
        "refine_documents_chain": _load_refine_documents_chain,
        "sql_database_chain": _load_sql_database_chain,
        "vector_db_qa_with_sources_chain": _load_vector_db_qa_with_sources_chain,
        "vector_db_qa": _load_vector_db_qa,
    }
    
    

    举例:

    def _load_stuff_documents_chain(config: dict, **kwargs: Any) -> StuffDocumentsChain:
        if "llm_chain" in config:
            llm_chain_config = config.pop("llm_chain")
            llm_chain = load_chain_from_config(llm_chain_config)
        elif "llm_chain_path" in config:
            llm_chain = load_chain(config.pop("llm_chain_path"))
        else:
            raise ValueError("One of `llm_chain` or `llm_chain_config` must be present.")
    
        if not isinstance(llm_chain, LLMChain):
            raise ValueError(f"Expected LLMChain, got {llm_chain}")
    
        if "document_prompt" in config:
            prompt_config = config.pop("document_prompt")
            document_prompt = load_prompt_from_config(prompt_config)
        elif "document_prompt_path" in config:
            document_prompt = load_prompt(config.pop("document_prompt_path"))
        else:
            raise ValueError(
                "One of `document_prompt` or `document_prompt_path` must be present."
            )
    
        return StuffDocumentsChain(
            llm_chain=llm_chain, document_prompt=document_prompt, **config
        )
    

    更加细致的组件有:
    llm的loader, prompt的loader, 等等, 分别在每个模块下的loading.py文件中

    transformation chain

    提供了一个机制, 对用户的输入进行修改. 举例:

    def transform_func(inputs: dict) -> dict:
        text = inputs["text"]
        shortened_text = "\n\n".join(text.split("\n\n")[:3])
        return {"output_text": shortened_text}
    
    transform_chain = TransformChain(input_variables=["text"], output_variables=["output_text"], transform=transform_func)
    

    输入层 prompt

    prompt传递给大模型的消息模板和抽象. 一般会在prompt中放三类内容:

    1. 对大模型的指示, 会话的设置, 如: 你是一个技术精湛的程序员
    2. 一些example, 以帮助大模型更好的理解输入和给出输出
    3. 提出的问题
    image.png

    BasePromptTemplate 作为基类, 暴露格式化prompt模板的方法, 返回一个prompt.
    参数:

    • input_variables, promptTemplate内部需要接受的参数
    • output_parser: 对于大模型的返回, 可以用output_parser来解析它的返回. output_parser 的解析侧重是: 持有大模型返回的消息的结构, 做信息的提取, 如返回是一个json, json有字段为realAns, resAns是prompt需要的答案, 再或者返回的数据需要以;切分为列表; 而不是侧重网络数据解析, 序列化反序列化.

    @abstractmethod def format_prompt(self, **kwargs: Any) -> PromptValue: 格式化用户输入, 得到prompt
    @root_validator() def validate_variable_names(cls, values: Dict) -> Dict: 验证输入, 用户输入是否覆盖了partial 输入

    需要关注的是两个子类: PromptTemplate, ChatPromptTemplate 前者是一次问答型业务常用的, 后者是问答聊天型业务设计的.

    PromptTemplate 利用的语言自己的format能力 或者 其他库(如web开发常用的ninja引擎), 举例, 如从template中提取用户需要输入的变量, template如: i am a {someadj} student 其中someadj就是用户要输入的变量.

            input_variables = {
                v for _, v, _, _ in Formatter().parse(template) if v is not None
            }
    

    ChatPromptTemplate :
    专属chat的prompt设计, 存储着chat的messages/templates

        @classmethod
        def from_role_strings(
            cls, string_messages: List[Tuple[str, str]]
        ) -> ChatPromptTemplate:
            messages = [
                ChatMessagePromptTemplate(
                    content=PromptTemplate.from_template(template), role=role
                )
                for role, template in string_messages
            ]
            return cls.from_messages(messages)
    

    BaseMessagePromptTemplate 是chat中的一条消息/模板, 对应消息有speaker, AI, Human,System => ChatMessage, 如下, 本质和StringPromptTemplate没太大区别, 只是它的含义是chat的一条message, 且不能(适合)像StringPromptTemplate一样放多个问题在里面.

    image.png

    数据导入- loader

    document_loaders 中含有大量的不同数据源的loader, loader的基本逻辑是: 连接到数据源, 拉取数据, 按照指定的大小切块.
    BaseLoader 接口
    Document 统一的数据表示, 不同的数据源的数据都要表示成Document, 方便Splitter的设计和处理.

    UnstructuredBaseLoader 为例

        def _get_elements(self) -> List:
            """Get elements."""
    
        @abstractmethod
        def _get_metadata(self) -> dict:
            """Get metadata."""
    
        def load(self) -> List[Document]:
            """Load file."""
            elements = self._get_elements()
            if self.mode == "elements":
                docs: List[Document] = list()
                for element in elements:
                    metadata = self._get_metadata()
                    # NOTE(MthwRobinson) - the attribute check is for backward compatibility
                    # with unstructured<0.4.9. The metadata attributed was added in 0.4.9.
                    if hasattr(element, "metadata"):
                        metadata.update(element.metadata.to_dict())
                    if hasattr(element, "category"):
                        metadata["category"] = element.category
                    docs.append(Document(page_content=str(element), metadata=metadata))
            elif self.mode == "single":
                metadata = self._get_metadata()
                text = "\n\n".join([str(el) for el in elements])
                docs = [Document(page_content=text, metadata=metadata)]
            else:
                raise ValueError(f"mode of {self.mode} not supported.")
            return docs
    
    

    数据访问 - store

    Docstore 访问存储了docs的接口, 任何的数据库,网络,甚至内存块都可以成为一个store, 只要实现了search接口, 能够从中搜索doc.

    举例: 把wiki当做是一个store

        def search(self, search: str) -> Union[str, Document]:
            """Try to search for wiki page.
    
            If page exists, return the page summary, and a PageWithLookups object.
            If page does not exist, return similar entries.
            """
            import wikipedia
    
            try:
                page_content = wikipedia.page(search).content
                url = wikipedia.page(search).url
                result: Union[str, Document] = Document(
                    page_content=page_content, metadata={"page": url}
                )
            except wikipedia.PageError:
                result = f"Could not find [{search}]. Similar: {wikipedia.search(search)}"
            except wikipedia.DisambiguationError:
                result = f"Could not find [{search}]. Similar: {wikipedia.search(search)}"
            return result
    

    记忆模块 memory

    大模型在会话的时候, 多数是无状态的, 需要调用者自己维持context. memory是通用的记忆模块, 帮助所有基于langchain做开发的应用维持会话的context, 或者是(a concept of state around through a user's interactions).
    有两种使用机制: 一种是可以从memory中提取一定的信息, 如messages的序列. 还可以是 在chain中直接使用memory, memory和chain本身会将 context 以某种形式(可能是经过llm精炼过的, 不是原始的kv) 存储下来.

    举例: ConversationBufferMemory

        def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
            """Save context from this conversation to buffer."""
            if self.input_key is None:
                prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
            else:
                prompt_input_key = self.input_key
            if self.output_key is None:
                if len(outputs) != 1:
                    raise ValueError(f"One output key expected, got {outputs.keys()}")
                output_key = list(outputs.keys())[0]
            else:
                output_key = self.output_key
            self.chat_memory.add_user_message(inputs[prompt_input_key])
            self.chat_memory.add_ai_message(outputs[output_key])
    

    举例: ConversationEntityMemory
    先是使用大模型从历史对话的k条内容中提取名词实体, 之后作为参数

    
      def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
            """Return history buffer."""
            chain = LLMChain(llm=self.llm, prompt=self.entity_extraction_prompt)
            if self.input_key is None:
                prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
            else:
                prompt_input_key = self.input_key
            buffer_string = get_buffer_string(
                self.buffer[-self.k * 2 :],
                human_prefix=self.human_prefix,
                ai_prefix=self.ai_prefix,
            )
            output = chain.predict(
                history=buffer_string,
                input=inputs[prompt_input_key],
            )
            if output.strip() == "NONE":
                entities = []
            else:
                entities = [w.strip() for w in output.split(",")] // 历史中提取到的实体
            entity_summaries = {}
            for entity in entities:
                entity_summaries[entity] = self.store.get(entity, "") // 获取每个实体的值, 并且更新
            self.entity_cache = entities // 将此次抽取的实体们保存下来
            if self.return_messages: 
                buffer: Any = self.buffer[-self.k * 2 :] 
            else:
                buffer = buffer_string
            return {
                self.chat_history_key: buffer,
                "entities": entity_summaries,
            }
    
    

    将历史保存起来: 从上一次的实体抽取中获取每个抽取的实体, 之后利用大模型进行总结, 将每个实体的总结结果保存起来. 稍微留一下代码chain.predict : 在langchain项目中, 很多用到这样的传参方式, 虽然chain用不到那些参数, 但是这个chain实例内的prompt会用. 对于熟悉静态类型编程的同学,一般都不太习惯这种方式, 一般情况下更多的会用一个context类来向下传递参数吧. todo: 优化: 设计ParamContext来传递参数, 使用者使用ParamConsumer来从中提取关注的信息, 这样可以避免dict, kwargs满天飞的场景

    
        def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
            """Save context from this conversation to buffer."""
            super().save_context(inputs, outputs)
            if self.input_key is None:
                prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
            else:
                prompt_input_key = self.input_key
            for entity in self.entity_cache:
                chain = LLMChain(llm=self.llm, prompt=self.entity_summarization_prompt)
                # key value store for entity
                existing_summary = self.store.get(entity, "")
                buffer_string = get_buffer_string(
                    self.buffer[-self.k * 2 :],
                    human_prefix=self.human_prefix,
                    ai_prefix=self.ai_prefix,
                )
    
                output = chain.predict(
                    summary=existing_summary,
                    history=buffer_string,
                    input=inputs[prompt_input_key],
                    entity=entity,
                )
                self.store[entity] = output.strip()
    

    agent and tools 和其他项目的交互

    agent

    Agent 负责调用大模型, 并且决定接下来的动作. 调用大模型的结果是做决定的参考:
    plan -> _get_next_action -> llm(专属prompt).predict -> _extract_tool_and_input 获取下一个action以及要给它的输入, 得到 AgentAction

    AgentAction : agent要执行的动作封装.

    AgentExecutor : 从命名上理解是 Agent的执行环境, 执行器. Agent 只能决定接下来要执行的动作, 而AgentExecutor 才是具体发起执行, 进行执行的执行者. _take_next_step -> Agent.plan -> tool.run

    tools

    BaseTool 子类要重写 run 以赋予不同的运行逻辑

    向量化 和 存储 vector

    VectorStore : 向量存储接口, 子类需要实现抽象方法(加粗的)以完成对应引擎的访问.

    • add_texts
    • add_documents
    • similarity_search
    • similarity_search_by_vector
    • max_marginal_relevance_search
    • max_marginal_relevance_search_by_vector
    • from_documents
    • from_texts

    举例: OpenSearchVectorStore

    调用 大模型的embedding 能力, 将其向量化, 并且写入索引. (需预先创建好索引, 自动创建的索引数据类型不对)

        def add_texts(
            self,
            texts: Iterable[str],
            metadatas: Optional[List[dict]] = None,
            bulk_size: int = 500,
            **kwargs: Any,
        ) -> List[str]:
            """Run more texts through the embeddings and add to the vectorstore.
    
            Args:
                texts: Iterable of strings to add to the vectorstore.
                metadatas: Optional list of metadatas associated with the texts.
                bulk_size: Bulk API request count; Default: 500
    
            Returns:
                List of ids from adding the texts into the vectorstore.
            """
            embeddings = [
                self.embedding_function.embed_documents(list(text))[0] for text in texts
            ]
            _validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
            return _bulk_ingest_embeddings(
                self.client, self.index_name, embeddings, texts, metadatas
            )
    
    
    

    相似度搜索, 还是用大模型的embedding能力, 将查询转化为向量, 然后利用存储引擎自身的相似度搜索能力, 搜索出文档. 搜索完成后, 取k条结果, 返回给上层.

            embedding = self.embedding_function.embed_query(query)
            search_type = _get_kwargs_value(kwargs, "search_type", "approximate_search")
            if search_type == "approximate_search":
                size = _get_kwargs_value(kwargs, "size", 4)
                search_query = _default_approximate_search_query(embedding, size, k)
            elif search_type == SCRIPT_SCORING_SEARCH:
                space_type = _get_kwargs_value(kwargs, "space_type", "l2")
                pre_filter = _get_kwargs_value(kwargs, "pre_filter", MATCH_ALL_QUERY)
                search_query = _default_script_query(embedding, space_type, pre_filter)
            elif search_type == PAINLESS_SCRIPTING_SEARCH:
                space_type = _get_kwargs_value(kwargs, "space_type", "l2Squared")
                pre_filter = _get_kwargs_value(kwargs, "pre_filter", MATCH_ALL_QUERY)
                search_query = _default_painless_scripting_query(
                    embedding, space_type, pre_filter
                )
    

    from_texts: 构造vectorstore, 创建新索引, 且将数据写入. 封装了这一系列过程.

    embedding 向量接口, 每个子类需要实现两个方法, 分别向量化文档和查询. 在实现中要注意的细节点是, 和大模型交互, 控制每次发送的数据量. 以及增加重试机制.

    def embed_documents(self, texts: List[str]) -> List[List[float]]:

    def embed_query(self, text: str) -> List[float]:

    相关文章

      网友评论

          本文标题:langchain项目分析

          本文链接:https://www.haomeiwen.com/subject/bnnhrdtx.html