Flow (Laminar): un motor de tareas ligero para construir inteligencias que simplifica y gestiona con flexibilidad las tareas.
Últimos recursos sobre IAActualizado hace 10 meses Círculo de intercambio de inteligencia artificial 19.9K 00
Introducción general
Flow es un motor de tareas ligero diseñado para construir agentes de IA con un énfasis en la simplicidad y la flexibilidad. A diferencia de los flujos de trabajo tradicionales basados en nodos y bordes, Flow utiliza un sistema de cola de tareas dinámico que admite la ejecución paralela, la programación dinámica y la gestión inteligente de dependencias. Flow no requiere bordes predefinidos entre los nodos y adopta una arquitectura de programación dinámica de tareas para ayudar a los desarrolladores a escribir un código más limpio y fácil de entender. Flow, mantenido por el equipo de Laminar, admite el seguimiento automatizado y la gestión de estados, y es adecuado para una gran variedad de escenarios de aplicaciones de IA.
Lista de funciones
- Ejecución de tareas en paralelo: ejecute automáticamente tareas en paralelo sin código de subprocesamiento explícito.
- Programación dinámica: las tareas pueden programar nuevas tareas en tiempo de ejecución.
- Gestión inteligente de las dependencias: las tareas pueden esperar el resultado de una operación anterior.
- Gestión de estados: guardar y cargar estados de tareas, a partir de la ejecución de una tarea específica.
- Bifurcación condicional y flujo de control: se admiten la bifurcación condicional y el control de bucle.
- Ejecución de tareas en streaming: admite la ejecución de tareas en streaming.
- Seguimiento automatizado: Soporta el seguimiento automatizado de OpenTelemetry para facilitar la depuración y la reconstrucción del estado.
- Ligero y sin dependencias externas: el diseño es sencillo, flexible y potente.
Utilizar la ayuda
Proceso de instalación
Para instalar Flow, basta con utilizar el comando pip:
pip install lmnr-flow
Uso básico
A continuación se muestra un sencillo ejemplo de su uso:
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# 创建Flow实例
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 定义一个简单任务
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
# 添加任务到Flow
flow.add_task("greet", my_task)
# 运行任务
result = flow.run("greet")
print(result) # 输出: {"greet": "Hello World!"}
cadena de misiones
Las misiones pueden desencadenar otras misiones:
def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput:
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # 输出: {"task2": "result2"}
ejecución paralela
Se pueden ejecutar varias tareas en paralelo:
def starter(context: Context) -> TaskOutput:
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # 两个任务并行执行,总耗时约1秒
Resultados del streaming
Las tareas pueden transmitirse para devolver resultados intermedios:
def streaming_task(context: Context) -> TaskOutput:
stream = context.get_stream()
for i in range(3):
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")
Flujo de trabajo dinámico
Las tareas pueden programarse dinámicamente en función de las condiciones:
def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # 任务循环3次后完成
parámetro de entrada
Las tareas pueden recibir parámetros de entrada:
def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # 输出: {"greet": "Hello Alice!"}
enrutamiento dinámico
Las tareas pueden enrutarse dinámicamente en función de las entradas:
def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # 输出: {"process_task": "processed data"}
Gestión de la situación
Los estados de las tareas pueden guardarse y cargarse:
context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"
Map Reducir
Las tareas pueden realizar operaciones Map Reduce:
def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}
Agente LLM
Ejemplo de agente LLM para la selección dinámica de herramientas:
from typing import List
import json
def llm_agent(context: Context) -> TaskOutput:
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})
© declaración de copyright
Derechos de autor del artículo Círculo de intercambio de inteligencia artificial Todos, por favor no reproducir sin permiso.
Artículos relacionados
Sin comentarios...