AI Personal Learning
and practical guidance

Flow (Laminar): a lightweight task engine for building intelligences that simplifies and flexibly manages tasks

General Introduction

Flow is a lightweight task engine designed for building AI agents with an emphasis on simplicity and flexibility. Unlike traditional node- and edge-based workflows, Flow uses a dynamic task queuing system that supports parallel execution, dynamic scheduling, and intelligent dependency management. Its core concept is to make complex workflows simple and easy through parallel task execution, dynamic workflows and conditional branching control.Flow does not require predefined edges between nodes, and adopts a dynamic task scheduling architecture to help developers write cleaner, easy-to-understand code.Maintained by the Laminar team, Flow supports automated tracing and state management, and is suitable for a variety of AI application scenarios.

 

Function List

  • Parallel Task Execution: Automatically run tasks in parallel without explicit threading code.
  • Dynamic scheduling: tasks can schedule new tasks at runtime.
  • Intelligent dependency management: tasks can wait for the result of a previous operation.
  • State management: Saving and loading task states, starting from the execution of a specific task.
  • Conditional branching and control flow: Conditional branching and loop control are supported.
  • Streaming Task Execution: Supports streaming of task execution.
  • Automated Tracing: Supports OpenTelemetry's automated tracing for easy debugging and state reconstruction.
  • Lightweight and no external dependencies: the design is simple, flexible and powerful.

 

Using Help

Installation process

To install Flow, simply use the pip command:

pip install lmnr-flow

Basic use

Below is a simple example of how to use it:

from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# Create a Flow instance
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# Define a simple task
def my_task(context: Context) -> TaskOutput.
return TaskOutput(output="Hello World!")
# Add a task to Flow
flow.add_task("greet", my_task)
# Run the task
result = flow.run("greet")
print(result) # Output: {"greet": "Hello World!"}

task chain

Quests can trigger other quests:

def task1(context: Context) -> TaskOutput.
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput.
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # output: {"task2": "result2"}

parallel execution

Multiple tasks can be executed in parallel:

def starter(context: Context) -> TaskOutput.
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # The two tasks are executed in parallel, and the total time taken is about 1 second.

Streaming results

Tasks can be streamed to return intermediate results:

def streaming_task(context: Context) -> TaskOutput.
stream = context.get_stream()
for i in range(3): stream.put(StreamChunk("streaming_task"), f
stream.put(StreamChunk("streaming_task", f "interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"): print(f"{task_id}", streaming_task})
print(f"{task_id}: {output}")

Dynamic workflow

Tasks can be dynamically scheduled based on conditions:

def conditional_task(context: Context) -> TaskOutput.
count = context.get("count", 0)
if count >= 3: return TaskOutput(output="done")
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f "iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # Task completes after 3 loops

input parameter

Tasks can receive input parameters:

def parameterized_task(context: Context) -> TaskOutput.
name = context.get("user_name")
return TaskOutput(output=f "Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # output: {"greet": "Hello Alice!"}

dynamic routing

Tasks can be dynamically routed based on input:

def router(context: Context) -> TaskOutput.
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f "routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # output: {"process_task": "processed data"}

Status Management

Task states can be saved and loaded:

context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"

Map Reduce

Tasks can perform Map Reduce operations:

def task1(ctx).
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx).
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx).
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}

LLM Agent

Example of an LLM agent for dynamic tool selection:

from typing import List
import json
def llm_agent(context: Context) -> TaskOutput.
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})

AI Easy Learning

The layman's guide to getting started with AI

Help you learn how to utilize AI tools at a low cost and from a zero base.AI, like office software, is an essential skill for everyone. Mastering AI will give you an edge in your job search and half the effort in your future work and studies.

View Details>
May not be reproduced without permission:Chief AI Sharing Circle " Flow (Laminar): a lightweight task engine for building intelligences that simplifies and flexibly manages tasks

Chief AI Sharing Circle

Chief AI Sharing Circle specializes in AI learning, providing comprehensive AI learning content, AI tools and hands-on guidance. Our goal is to help users master AI technology and explore the unlimited potential of AI together through high-quality content and practical experience sharing. Whether you are an AI beginner or a senior expert, this is the ideal place for you to gain knowledge, improve your skills and realize innovation.

Contact Us
en_USEnglish