Flow (Laminar) : un moteur de tâches léger pour la construction d'intelligences qui simplifie et gère les tâches de manière flexible.
Introduction générale
Flow est un moteur de tâches léger conçu pour construire des agents d'intelligence artificielle en mettant l'accent sur la simplicité et la flexibilité. Contrairement aux flux de travail traditionnels basés sur les nœuds et les bords, Flow utilise un système de file d'attente dynamique qui prend en charge l'exécution parallèle, l'ordonnancement dynamique et la gestion intelligente des dépendances. Son concept de base est de rendre les flux de travail complexes simples et faciles grâce à l'exécution parallèle des tâches, aux flux de travail dynamiques et au contrôle des branchements conditionnels.Flow ne nécessite pas de bords prédéfinis entre les nœuds et adopte une architecture d'ordonnancement dynamique des tâches pour aider les développeurs à écrire un code plus propre et facile à comprendre.Maintenu par l'équipe de Laminar, Flow prend en charge le traçage automatisé et la gestion de l'état, et convient à une variété de scénarios d'application de l'intelligence artificielle.
Liste des fonctions
- Exécution de tâches en parallèle : Exécution automatique de tâches en parallèle sans code de threading explicite.
- Planification dynamique : les tâches peuvent planifier de nouvelles tâches au moment de l'exécution.
- Gestion intelligente des dépendances : les tâches peuvent attendre le résultat d'une opération précédente.
- Gestion des états : sauvegarde et chargement des états des tâches, à partir de l'exécution d'une tâche spécifique.
- Branchement conditionnel et flux de contrôle : le branchement conditionnel et le contrôle en boucle sont pris en charge.
- Exécution de tâches en continu : prend en charge l'exécution de tâches en continu.
- Traçage automatisé : prend en charge le traçage automatisé d'OpenTelemetry pour faciliter le débogage et la reconstruction de l'état.
- Léger et sans dépendance externe : la conception est simple, flexible et puissante.
Utiliser l'aide
Processus d'installation
Pour installer Flow, il suffit d'utiliser la commande pip :
pip install lmnr-flow
Utilisation de base
Voici un exemple simple de son utilisation :
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# 创建Flow实例
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 定义一个简单任务
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
# 添加任务到Flow
flow.add_task("greet", my_task)
# 运行任务
result = flow.run("greet")
print(result) # 输出: {"greet": "Hello World!"}
chaîne de mission
Les quêtes peuvent déclencher d'autres quêtes :
def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput:
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # 输出: {"task2": "result2"}
exécution en parallèle
Plusieurs tâches peuvent être exécutées en parallèle :
def starter(context: Context) -> TaskOutput:
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # 两个任务并行执行,总耗时约1秒
Résultats de la diffusion en continu
Les tâches peuvent être transmises en continu afin de renvoyer des résultats intermédiaires :
def streaming_task(context: Context) -> TaskOutput:
stream = context.get_stream()
for i in range(3):
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")
Flux de travail dynamique
Les tâches peuvent être planifiées de manière dynamique en fonction de certaines conditions :
def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # 任务循环3次后完成
paramètre d'entrée
Les tâches peuvent recevoir des paramètres d'entrée :
def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # 输出: {"greet": "Hello Alice!"}
routage dynamique
Les tâches peuvent être acheminées de manière dynamique en fonction des données fournies :
def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # 输出: {"process_task": "processed data"}
Gestion du statut
Les états des tâches peuvent être sauvegardés et chargés :
context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"
Map Reduce
Les tâches peuvent effectuer des opérations Map Reduce :
def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}
Agent LLM
Exemple d'un agent LLM pour la sélection dynamique d'outils :
from typing import List
import json
def llm_agent(context: Context) -> TaskOutput:
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})
© déclaration de droits d'auteur
Article copyright Cercle de partage de l'IA Tous, prière de ne pas reproduire sans autorisation.
Articles connexes
Pas de commentaires...