General Introduction
Flow is a lightweight task engine designed for building AI agents with an emphasis on simplicity and flexibility. Unlike traditional node- and edge-based workflows, Flow uses a dynamic task queuing system that supports parallel execution, dynamic scheduling, and intelligent dependency management. Its core concept is to make complex workflows simple and easy through parallel task execution, dynamic workflows and conditional branching control.Flow does not require predefined edges between nodes, and adopts a dynamic task scheduling architecture to help developers write cleaner, easy-to-understand code.Maintained by the Laminar team, Flow supports automated tracing and state management, and is suitable for a variety of AI application scenarios.
Function List
- Parallel Task Execution: Automatically run tasks in parallel without explicit threading code.
- Dynamic scheduling: tasks can schedule new tasks at runtime.
- Intelligent dependency management: tasks can wait for the result of a previous operation.
- State management: Saving and loading task states, starting from the execution of a specific task.
- Conditional branching and control flow: Conditional branching and loop control are supported.
- Streaming Task Execution: Supports streaming of task execution.
- Automated Tracing: Supports OpenTelemetry's automated tracing for easy debugging and state reconstruction.
- Lightweight and no external dependencies: the design is simple, flexible and powerful.
Using Help
Installation process
To install Flow, simply use the pip command:
pip install lmnr-flow
Basic use
Below is a simple example of how to use it:
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# Create a Flow instance
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# Define a simple task
def my_task(context: Context) -> TaskOutput.
return TaskOutput(output="Hello World!")
# Add a task to Flow
flow.add_task("greet", my_task)
# Run the task
result = flow.run("greet")
print(result) # Output: {"greet": "Hello World!"}
task chain
Quests can trigger other quests:
def task1(context: Context) -> TaskOutput.
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput.
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # output: {"task2": "result2"}
parallel execution
Multiple tasks can be executed in parallel:
def starter(context: Context) -> TaskOutput.
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # The two tasks are executed in parallel, and the total time taken is about 1 second.
Streaming results
Tasks can be streamed to return intermediate results:
def streaming_task(context: Context) -> TaskOutput.
stream = context.get_stream()
for i in range(3): stream.put(StreamChunk("streaming_task"), f
stream.put(StreamChunk("streaming_task", f "interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"): print(f"{task_id}", streaming_task})
print(f"{task_id}: {output}")
Dynamic workflow
Tasks can be dynamically scheduled based on conditions:
def conditional_task(context: Context) -> TaskOutput.
count = context.get("count", 0)
if count >= 3: return TaskOutput(output="done")
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f "iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # Task completes after 3 loops
input parameter
Tasks can receive input parameters:
def parameterized_task(context: Context) -> TaskOutput.
name = context.get("user_name")
return TaskOutput(output=f "Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # output: {"greet": "Hello Alice!"}
dynamic routing
Tasks can be dynamically routed based on input:
def router(context: Context) -> TaskOutput.
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f "routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # output: {"process_task": "processed data"}
Status Management
Task states can be saved and loaded:
context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"
Map Reduce
Tasks can perform Map Reduce operations:
def task1(ctx).
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx).
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx).
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}
LLM Agent
Example of an LLM agent for dynamic tool selection:
from typing import List
import json
def llm_agent(context: Context) -> TaskOutput.
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})