AI个人学习
和实操指南

Flow(Laminar):构建智能体的轻量级任务引擎,简化并灵活管理任务

本文于 2024-12-04 18:42 更新,部分内容具有时效性,如有失效,请留言

综合介绍

Flow是一个轻量级的任务引擎,专为构建AI代理而设计,强调简洁性和灵活性。与传统的基于节点和边的工作流不同,Flow采用动态任务队列系统,支持并行执行、动态调度和智能依赖管理。其核心理念是通过并行任务执行、动态工作流和条件分支控制,使复杂的工作流变得简单易行。Flow无需预定义节点之间的边,采用动态任务调度架构,帮助开发者编写更简洁、易于理解的代码。Flow由Laminar团队维护,支持自动化追踪和状态管理,适用于各种AI应用场景。

 

功能列表

  • 并行任务执行:自动并行运行任务,无需显式线程代码。
  • 动态调度:任务可以在运行时调度新任务。
  • 智能依赖管理:任务可以等待前一个操作的结果。
  • 状态管理:保存和加载任务状态,从特定任务开始执行。
  • 条件分支和控制流:支持条件分支和循环控制。
  • 流式任务执行:支持任务执行的流式处理。
  • 自动化追踪:支持OpenTelemetry的自动化追踪,便于调试和状态重建。
  • 轻量级且无外部依赖:设计简洁、灵活且功能强大。

 

使用帮助

安装流程

要安装Flow,只需使用pip命令:

pip install lmnr-flow

基本使用

以下是一个简单的使用示例:

from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# 创建Flow实例
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 定义一个简单任务
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
# 添加任务到Flow
flow.add_task("greet", my_task)
# 运行任务
result = flow.run("greet")
print(result)  # 输出: {"greet": "Hello World!"}

任务链

任务可以触发其他任务:

def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput:
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1")  # 输出: {"task2": "result2"}

并行执行

多个任务可以并行执行:

def starter(context: Context) -> TaskOutput:
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter")  # 两个任务并行执行,总耗时约1秒

流式结果

任务可以流式返回中间结果:

def streaming_task(context: Context) -> TaskOutput:
stream = context.get_stream()
for i in range(3):
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")

动态工作流

任务可以根据条件动态调度:

def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task")  # 任务循环3次后完成

输入参数

任务可以接收输入参数:

def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result)  # 输出: {"greet": "Hello Alice!"}

动态路由

任务可以根据输入动态路由:

def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"})  # 输出: {"process_task": "processed data"}

状态管理

任务状态可以保存和加载:

context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"

Map Reduce

任务可以进行Map Reduce操作:

def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}

LLM代理

动态工具选择的LLM代理示例:

from typing import List
import json
def llm_agent(context: Context) -> TaskOutput:
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})

AI轻松学

普通人的AI入门指南

帮助你以低成本、零基础学会如何利用AI工具。AI就像办公软件一样,是每个人的必备技能。 掌握AI会让你在求职中占据优势,在未来的工作和学习中事半功倍。

查看详情>
未经允许不得转载:首席AI分享圈 » Flow(Laminar):构建智能体的轻量级任务引擎,简化并灵活管理任务

首席AI分享圈

首席AI分享圈专注于人工智能学习,提供全面的AI学习内容、AI工具和实操指导。我们的目标是通过高质量的内容和实践经验分享,帮助用户掌握AI技术,一起挖掘AI的无限潜能。无论您是AI初学者还是资深专家,这里都是您获取知识、提升技能、实现创新的理想之地。

联系我们
zh_CN简体中文