Flow(Laminar): 작업을 간소화하고 유연하게 관리하는 빌딩 인텔리전스를 위한 경량 작업 엔진입니다.

최신 AI 리소스8개월 전 업데이트 AI 공유 서클
1.5K 00

일반 소개

Flow는 단순성과 유연성에 중점을 두고 AI 에이전트를 구축하기 위해 설계된 경량 작업 엔진입니다. 기존의 노드 및 엣지 기반 워크플로와 달리 Flow는 병렬 실행, 동적 스케줄링 및 지능형 종속성 관리를 지원하는 동적 작업 대기열 시스템을 사용합니다. 핵심 개념은 병렬 작업 실행, 동적 워크플로 및 조건부 분기 제어를 통해 복잡한 워크플로를 간단하고 쉽게 만드는 것입니다. Flow는 노드 간에 사전 정의된 에지가 필요하지 않으며, 개발자가 더 깔끔하고 이해하기 쉬운 코드를 작성할 수 있도록 동적 작업 스케줄링 아키텍처를 채택하고 있습니다.Laminar 팀에서 관리하는 Flow는 자동화된 추적 및 상태 관리를 지원하며 다양한 AI 애플리케이션 시나리오에 적합합니다.

 

기능 목록

  • 병렬 작업 실행: 명시적인 스레딩 코드 없이 자동으로 작업을 병렬로 실행합니다.
  • 동적 예약: 작업은 런타임에 새 작업을 예약할 수 있습니다.
  • 지능형 종속성 관리: 작업은 이전 작업의 결과를 기다릴 수 있습니다.
  • 상태 관리: 특정 작업의 실행부터 시작하여 작업 상태를 저장하고 로드합니다.
  • 조건부 분기 및 제어 흐름: 조건부 분기 및 루프 제어가 지원됩니다.
  • 작업 실행 스트리밍: 작업 실행 스트리밍을 지원합니다.
  • 자동화된 추적: 간편한 디버깅 및 상태 재구성을 위해 OpenTelemetry의 자동화된 추적을 지원합니다.
  • 가볍고 외부 종속성이 없는 디자인: 단순하고 유연하며 강력한 기능을 제공합니다.

 

도움말 사용

설치 프로세스

Flow를 설치하려면 pip 명령을 사용하면 됩니다:

pip install lmnr-flow

기본 사용

다음은 간단한 사용 예시입니다:

from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# 创建Flow实例
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 定义一个简单任务
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
# 添加任务到Flow
flow.add_task("greet", my_task)
# 运行任务
result = flow.run("greet")
print(result)  # 输出: {"greet": "Hello World!"}

미션 체인

퀘스트는 다른 퀘스트를 트리거할 수 있습니다:

def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput:
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1")  # 输出: {"task2": "result2"}

병렬 실행

여러 작업을 병렬로 실행할 수 있습니다:

def starter(context: Context) -> TaskOutput:
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter")  # 两个任务并行执行,总耗时约1秒

스트리밍 결과

작업을 스트리밍하여 중간 결과를 반환할 수 있습니다:

def streaming_task(context: Context) -> TaskOutput:
stream = context.get_stream()
for i in range(3):
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")

동적 워크플로

조건에 따라 작업을 동적으로 예약할 수 있습니다:

def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task")  # 任务循环3次后完成

입력 매개변수

작업은 입력 매개변수를 받을 수 있습니다:

def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result)  # 输出: {"greet": "Hello Alice!"}

동적 라우팅

입력에 따라 작업을 동적으로 라우팅할 수 있습니다:

def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"})  # 输出: {"process_task": "processed data"}

상태 관리

작업 상태를 저장하고 로드할 수 있습니다:

context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"

지도 축소

작업에서 맵 축소 작업을 수행할 수 있습니다:

def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("task1")
assert result == {"task3": ["result2", "result2", "result2"]}

LLM 에이전트

동적 도구 선택을 위한 LLM 에이전트의 예시입니다:

from typing import List
import json
def llm_agent(context: Context) -> TaskOutput:
prompt = context.get("user_input")
llm_response = {
"reasoning": "Need to search database and format results",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})
© 저작권 정책
AiPPT

관련 문서

댓글 없음

댓글에 참여하려면 로그인해야 합니다!
지금 로그인
없음
댓글 없음...