General Introduction
Flow is a lightweight task engine designed for building AI agents with an emphasis on simplicity and flexibility. Unlike traditional node- and edge-based workflows, Flow uses a dynamic task queuing system that supports parallel execution, dynamic scheduling, and intelligent dependency management. Its core concept is to make complex workflows simple and easy through parallel task execution, dynamic workflows and conditional branching control.Flow does not require predefined edges between nodes, and adopts a dynamic task scheduling architecture to help developers write cleaner, easy-to-understand code.Maintained by the Laminar team, Flow supports automated tracing and state management, and is suitable for a variety of AI application scenarios.
Function List
- Parallel Task Execution: Automatically run tasks in parallel without explicit threading code.
- Dynamic scheduling: tasks can schedule new tasks at runtime.
- Intelligent dependency management: tasks can wait for the result of a previous operation.
- State management: Saving and loading task states, starting from the execution of a specific task.
- Conditional branching and control flow: Conditional branching and loop control are supported.
- Streaming Task Execution: Supports streaming of task execution.
- Automated Tracing: Supports OpenTelemetry's automated tracing for easy debugging and state reconstruction.
- Lightweight and no external dependencies: the design is simple, flexible and powerful.
Using Help
Installation process
To install Flow, simply use the pip command:
pip install lmnr-flow
Basic use
Below is a simple example of how to use it:
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# 创建Flow实例
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 定义一个简单任务
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
# 添加任务到Flow
flow.add_task("greet", my_task)
# 运行任务
result = flow.run("greet")
print(result) # 输出: {"greet": "Hello World!"}
task chain
Quests can trigger other quests:
def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput:
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # 输出: {"task2": "result2"}
parallel execution
Multiple tasks can be executed in parallel:
def starter(context: Context) -> TaskOutput:
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # 两个任务并行执行,总耗时约1秒
Streaming results
Tasks can be streamed to return intermediate results:
def streaming_task(context: Context) -> TaskOutput:
stream = context.get_stream()
for i in range(3):
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")
Dynamic workflow
Tasks can be dynamically scheduled based on conditions:
def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # 任务循环3次后完成
input parameter
Tasks can receive input parameters:
def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # 输出: {"greet": "Hello Alice!"}
dynamic routing
Tasks can be dynamically routed based on input:
def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # 输出: {"process_task": "processed data"}
Status Management
Task states can be saved and loaded:
context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"
Map Reduce
Tasks can perform Map Reduce operations:
def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}
LLM Agent
Example of an LLM agent for dynamic tool selection:
from typing import List
import json
def llm_agent(context: Context) -> TaskOutput:
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})