Flow (Laminar): легкий движок для построения интеллекта, который упрощает и гибко управляет задачами
Общее введение
Flow - это легкий движок задач, предназначенный для создания агентов искусственного интеллекта с акцентом на простоту и гибкость. В отличие от традиционных рабочих процессов, основанных на узлах и границах, Flow использует динамическую систему очередей задач, которая поддерживает параллельное выполнение, динамическое планирование и интеллектуальное управление зависимостями. Основная концепция Flow - сделать сложные рабочие процессы простыми и легкими за счет параллельного выполнения задач, динамических рабочих процессов и условного управления ветвлениями. Flow не требует предопределенных границ между узлами и использует архитектуру динамического планирования задач, чтобы помочь разработчикам писать более чистый и простой для понимания код. Flow поддерживается командой Laminar, поддерживает автоматическое отслеживание и управление состоянием и подходит для различных сценариев применения ИИ.
Список функций
- Параллельное выполнение задач: автоматическое параллельное выполнение задач без явного кода потоков.
- Динамическое планирование: задачи могут планировать новые задачи во время выполнения.
- Интеллектуальное управление зависимостями: задачи могут ждать результата предыдущей операции.
- Управление состояниями: сохранение и загрузка состояний задач, начиная с выполнения конкретной задачи.
- Условное ветвление и поток управления: поддерживаются условное ветвление и управление циклом.
- Потоковое выполнение задач: поддержка потокового выполнения задач.
- Автоматизированная трассировка: поддерживает автоматизированную трассировку OpenTelemetry для упрощения отладки и восстановления состояния.
- Легкий вес и отсутствие внешних зависимостей: простой, гибкий и мощный дизайн.
Использование помощи
Процесс установки
Чтобы установить Flow, просто воспользуйтесь командой pip:
pip install lmnr-flow
Основное использование
Ниже приведен простой пример его использования:
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# 创建Flow实例
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 定义一个简单任务
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
# 添加任务到Flow
flow.add_task("greet", my_task)
# 运行任务
result = flow.run("greet")
print(result) # 输出: {"greet": "Hello World!"}
цепочка миссий
Квесты могут вызывать другие квесты:
def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput:
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # 输出: {"task2": "result2"}
параллельное выполнение
Несколько задач могут выполняться параллельно:
def starter(context: Context) -> TaskOutput:
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # 两个任务并行执行,总耗时约1秒
Результаты потокового вещания
Задачи могут быть потоковыми, чтобы возвращать промежуточные результаты:
def streaming_task(context: Context) -> TaskOutput:
stream = context.get_stream()
for i in range(3):
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")
Динамический рабочий процесс
Задачи можно динамически планировать в зависимости от условий:
def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # 任务循环3次后完成
входной параметр
Задачи могут получать входные параметры:
def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # 输出: {"greet": "Hello Alice!"}
динамическая маршрутизация
Задачи можно динамически направлять на основе исходных данных:
def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # 输出: {"process_task": "processed data"}
Управление состоянием
Состояния задач можно сохранять и загружать:
context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"
Map Reduce
Задачи могут выполнять операции Map Reduce:
def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}
Агент LLM
Пример LLM-агента для динамического выбора инструмента:
from typing import List
import json
def llm_agent(context: Context) -> TaskOutput:
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})
© заявление об авторских правах
Авторское право на статью Круг обмена ИИ Пожалуйста, не воспроизводите без разрешения.
Похожие статьи
Нет комментариев...