LangChain: Runnable的定义和使用

时间:2025-03-13 11:03:40

1. Runnable定义

1.1 定义

  在LangChain中,Runnable是LangChain中用于定义一个可运行对象的抽象接口。它允许开发者定义任何执行某种操作的逻辑单元,并通过标准化的方法使其能够在更大的系统中无缝协作。

1.2 LCEL

  通常情况下,LangChain中的链包含: 大模型(LLMs)、提示词模版(Prompt Template)、工具(Tools)和输出解析器(Output Parsers)。这几部分都继承并实现了Runnable接口,所以他们都是Runnable的实例。
  在LangChain中,可以使用LangChain Expression Language(LCEL)将多个Runnable组合成链。其具体的Runnable组合方式主要有两种:

  • RunnableSequence:按顺序调用一系列可运行文件,其中一个Runnable的输出作为下一个的输入。一般通过使用|运算符或可运行项列表来构造。具体举例如下:
from langchain_core.runnables import RunnableSequence
#方法1
chain=prompt|llm
#方法2
chain = RunnableSequence([prompt,llm])
  • RunnableParallell:同时调用多Runnable。在序列中使用dict或通过将dict传递给RunnableParallel来构造它。具体举例如下
from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

def mul_three(x: int) -> int:
    return x * 3

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
#方法1
sequence = runnable_1 | { 
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
#方法2
sequence = runnable_1 | RunnableParallel(
     {"mul_two": runnable_2, "mul_three": runnable_3})

2. Runnable使用举例

2.1 常用方法

  Runnable接口的核心方法中包含用于支持异步执行和流式处理的方法,如streaminvokebatch及其异步版本astreamainvokeabatch等。因为在谦虚博客中介绍了Prompt Template中的这些方法的用途,所以这里就不再赘述了,具体可以参考:/yeshang_lady/article/details/140753520

2.2 常用类
2.2.1 RunnableLambda和RunnableGenerator

  RunnableLambdaRunnableGenerator这两个类通常用来自定义Runnable。这两者的主要区别在于:

  • RunnableLambda:是将Python中的可调用对象包装成Runnable, 这种类型的Runnable无法处理流式数据。
  • RunnableGenerator:将Python中的生成器包装成Runnable,可以处理流式数据。

具体用法举例如下:

from langchain_core.runnables import RunnableLambda,RunnableGenerator
from dotenv import load_dotenv,find_dotenv
_=load_dotenv(find_dotenv())
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import CommaSeparatedListOutputParser
prompt=PromptTemplate.from_template("输出1到{max_value}之间的所有整数。每个数字之间用逗号,分隔。")
def add_one(x):
    return ' '.join([str((int(i)+1)) for i in x])
runnable=RunnableLambda(add_one)
llm=ChatOpenAI()
chain=prompt|llm|CommaSeparatedListOutputParser()|runnable
print(chain.invoke({"max_value":"10"}))
#流式处理
stream_llm=ChatOpenAI(model_kwargs={"stream":True})
def stream_add_one(x):
    for i in x:
        if i.content.isdigit():
            yield str((int(i.content)+1))
stream_chain=prompt|stream_llm|RunnableGenerator(stream_add_one)
for chunk in stream_chain.stream({"max_value":"10"}):
    print(chunk)
2.2.2 RunnableBinding

  RunnableBinding可以看作是Runnable的装饰器,它允许在不改变原函数代码的前提下,动态地添加或修改函数的功能。Runnable中可以通过以下方法创建RunnableBinding类或其子类。具体如下:

  • bind: 绑定运行参数kwargs。比如,可以将常用的方法(比如invoke, batch, transform, stream及其他)中的可选参数到Runnable上。
  • with_config:绑定config。
  • with_listeners:绑定生命周期监听器。Runnable可以设置三类监听器:on_starton_endon_error。通过监视器可以获取相关运行信息,包括其id、类型、输入、输出、错误、start_time、end_time以及其他标记和元数据。具体举例如下:
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time

def add_one(a):
    try:
        return a+1
    except Exception as e:
        print("Error: 数据类型错误,无法进行加法运算",e)

def fn_start(run_obj: Run):
    print("Runnable开始运行时间:",run_obj.start_time)

def fn_end(run_obj: Run):
    print("Runnable结束运行时间:",run_obj.end_time)
    
def fn_error(run_obj: Run):
    print(run_obj.error)

runnable=RunnableLambda(add_one).with_listeners(on_start=fn_start,
                                                on_end=fn_end,
                                                on_error=fn_error)
runnable.invoke(2)
runnable.invoke("2")

其执行结果如下:

Runnable开始运行时间: 2024-07-31 07:03:45.731580+00:00
Runnable结束运行时间: 2024-07-31 07:03:45.732749+00:00
Runnable开始运行时间: 2024-07-31 07:03:45.733600+00:00
Error: 数据类型错误,无法进行加法运算 can only concatenate str (not "int") to str
Runnable结束运行时间: 2024-07-31 07:03:45.734155+00:00
  • with_types:覆盖输入和输出类型。
  • with_fallbacks:绑定回退策略。
  • with_retry:绑定重试策略。
2.2.3 RunnableEach

  RunnableEach是一个用于批量处理任务的组件,它可以对一组输入数据分别应用指定的Runnable组件。举例如下:

from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.base import RunnableEach

def add_one(a):
    try:
        return a+1
    except Exception as e:
        print("Error: 数据类型错误,无法进行加法运算",e)
runnable=RunnableEach(bound=RunnableLambda(add_one))
print(runnable.invoke([1,2,4]))
2.2.4 RunnableBranch

  RunnableBranch可以依据不同的条件选择不同的分支,执行不同的功能。具体如下:

from langchain_core.runnables import RunnableBranch,RunnableLambda

def add_int_one(a):
    try:
        return a+1
    except Exception as e:
        print("Error:",e)
def add_str_one(a):
    try:
        return chr(ord(a)+1)
    except Exception as e:
        print("Error:",e)

runnable=RunnableBranch(
    (lambda x:type(x)==str,RunnableLambda(add_str_one)),
    (lambda x:type(x)==int,RunnableLambda(add_int_one)),
    lambda x:x,
)
print(runnable.invoke('a'))
print(runnable.invoke(1))

参考资料

  1. /en/latest/core_api_reference.html#module-langchain_core.runnables