Aprendizagem pessoal com IA
e orientação prática

Flow (Laminar): um mecanismo de tarefas leve para a criação de inteligências que simplifica e gerencia tarefas com flexibilidade

Introdução geral

O Flow é um mecanismo de tarefas leve projetado para criar agentes de IA com ênfase na simplicidade e na flexibilidade. Ao contrário dos fluxos de trabalho tradicionais baseados em nós e bordas, o Flow usa um sistema dinâmico de fila de tarefas que suporta execução paralela, agendamento dinâmico e gerenciamento inteligente de dependências. Seu conceito central é tornar fluxos de trabalho complexos simples e fáceis por meio da execução de tarefas paralelas, fluxos de trabalho dinâmicos e controle de ramificação condicional. O Flow não exige bordas predefinidas entre nós e adota uma arquitetura de agendamento de tarefas dinâmicas para ajudar os desenvolvedores a escrever códigos mais limpos e fáceis de entender.

 

Lista de funções

  • Execução de tarefas paralelas: executa automaticamente tarefas em paralelo sem código de encadeamento explícito.
  • Agendamento dinâmico: as tarefas podem agendar novas tarefas em tempo de execução.
  • Gerenciamento inteligente de dependências: as tarefas podem aguardar o resultado de uma operação anterior.
  • Gerenciamento de estados: salvar e carregar estados de tarefas, a partir da execução de uma tarefa específica.
  • Ramificação condicional e fluxo de controle: há suporte para ramificação condicional e controle de loop.
  • Execução de tarefas em fluxo contínuo: oferece suporte à execução de tarefas em fluxo contínuo.
  • Rastreamento automatizado: suporta o rastreamento automatizado do OpenTelemetry para facilitar a depuração e a reconstrução do estado.
  • Leve e sem dependências externas: o design é simples, flexível e avançado.

 

Usando a Ajuda

Processo de instalação

Para instalar o Flow, basta usar o comando pip:

pip install lmnr-flow

Uso básico

Abaixo está um exemplo simples de seu uso:

de concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# Criar uma instância do Flow
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# Definir uma tarefa simples
def my_task(context: Context) -> TaskOutput.
return TaskOutput(output="Hello World!")
# Adicionar uma tarefa ao Flow
flow.add_task("greet", my_task)
# Executar a tarefa
result = flow.run("greet")
print(result) # Saída: {"greet": "Hello World!"}

cadeia de missões

As missões podem desencadear outras missões:

def task1(context: Context) -> TaskOutput.
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput.
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # output: {"task2": "result2"}

execução paralela

Várias tarefas podem ser executadas em paralelo:

def starter(context: Context) -> TaskOutput.
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # As duas tarefas são executadas em paralelo, e o tempo total gasto é de aproximadamente 1 segundo.

Resultados de streaming

As tarefas podem ser transmitidas para retornar resultados intermediários:

def streaming_task(context: Context) -> TaskOutput.
stream = context.get_stream()
for i in range(3): stream.put(StreamChunk("streaming_task"), f
stream.put(StreamChunk("streaming_task", f "interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"): print(f"{task_id}", streaming_task): return TaskOutput(output="final")
print(f"{task_id}: {output}")

Fluxo de trabalho dinâmico

As tarefas podem ser agendadas dinamicamente com base em condições:

def conditional_task(context: Context) -> TaskOutput.
count = context.get("count", 0)
se count >= 3: return TaskOutput(output="done")
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f "iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # A tarefa é concluída após 3 loops

parâmetro de entrada

As tarefas podem receber parâmetros de entrada:

def parameterised_task(context: Context) -> TaskOutput.
name = context.get("nome_do_usuário")
return TaskOutput(output=f "Hello {name}!")
flow.add_task("greet", parameterised_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # output: {"greet": "Hello Alice!"}

roteamento dinâmico

As tarefas podem ser roteadas dinamicamente com base na entrada:

def router(context: Context) -> TaskOutput.
task_type = context.get("type")
rotas = {
"process": [NextTask("process_task")], "analyse": [NextTask("analyse_task")
"analyse": [NextTask("analyse_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f "roteamento para {tipo_de_tarefa}", next_tasks=routes.get(tipo_de_tarefa, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # output: {"process_task": "processed data"}

Gerenciamento de status

Os estados da tarefa podem ser salvos e carregados:

contexto = Contexto()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
Assert flow.context.get("task1") == "result1"
Assert flow.context.get("task2") == "result2"

Redução de mapas

As tarefas podem executar operações de Map Reduce:

def task1(ctx).
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx).
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}

Agente LLM

Exemplo de um agente LLM para seleção dinâmica de ferramentas:

from typing import List
import json
def llm_agent(context: Context) -> TaskOutput.
prompt = context.get("user_input")
llm_response = {
"reasoning": "Necessidade de pesquisar o banco de dados e formatar os resultados",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})

Não pode ser reproduzido sem permissão:Chefe do Círculo de Compartilhamento de IA " Flow (Laminar): um mecanismo de tarefas leve para a criação de inteligências que simplifica e gerencia tarefas com flexibilidade

Chefe do Círculo de Compartilhamento de IA

O Chief AI Sharing Circle se concentra no aprendizado de IA, fornecendo conteúdo abrangente de aprendizado de IA, ferramentas de IA e orientação prática. Nosso objetivo é ajudar os usuários a dominar a tecnologia de IA e explorar juntos o potencial ilimitado da IA por meio de conteúdo de alta qualidade e compartilhamento de experiências práticas. Seja você um iniciante em IA ou um especialista sênior, este é o lugar ideal para adquirir conhecimento, aprimorar suas habilidades e realizar inovações.

Entre em contato conosco
pt_BRPortuguês do Brasil