この記事は2024-12-04 18:42に更新されました。内容の一部は一刻を争うものですので、無効な場合はメッセージを残してください!
はじめに
Flowは、シンプルさと柔軟性を重視してAIエージェントを構築するために設計された軽量タスクエンジンです。従来のノードベースやエッジベースのワークフローとは異なり、Flowは並列実行、動的スケジューリング、インテリジェントな依存関係管理をサポートする動的タスクキューシステムを採用している。Flowのコアコンセプトは、並列タスク実行、動的ワークフロー、条件分岐制御により、複雑なワークフローをシンプルかつ簡単にすることです。Flowは、ノード間の事前定義されたエッジを必要とせず、動的タスクスケジューリングアーキテクチャを採用することで、開発者がよりクリーンで理解しやすいコードを書くことができます。
機能一覧
- 並列タスク実行: 明示的なスレッドコードなしでタスクを自動的に並列実行します。
- 動的スケジューリング:タスクは実行時に新しいタスクをスケジューリングできる。
- インテリジェントな依存関係管理:タスクは前の操作の結果を待つことができる。
- 状態管理:特定のタスクの実行を起点としたタスク状態の保存と読み込み。
- 条件分岐と制御フロー:条件分岐とループ制御がサポートされている。
- ストリーミング・タスク実行:タスク実行のストリーミングをサポート。
- 自動トレース:OpenTelemetryの自動トレースをサポートし、デバッグと状態の再構築を容易にします。
- 軽量で外部依存なし:設計はシンプルで柔軟かつ強力です。
ヘルプの使用
設置プロセス
Flowをインストールするには、pipコマンドを使うだけだ:
pip install lmnr-flow
基本的な使い方
以下はその簡単な使用例である:
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# Flowインスタンスを作成する
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 単純なタスクを定義する
def my_task(context: Context) -> TaskOutput.
return TaskOutput(output="Hello World!")
#タスクをFlowに追加する
flow.add_task("greet", my_task)
# タスクを実行する
result = flow.run("greet")
print(result) #出力:{"greet": "ハローワールド!"}。
ミッションチェーン
クエストは他のクエストを誘発することができる:
def task1(context: Context) -> TaskOutput.
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput.
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # output: {"task2": "result2"}.
並列実行
複数のタスクを並行して実行できる:
def starter(context: Context) -> TaskOutput.
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # 2つのタスクが並行して実行され、トータルにかかる時間は約1秒である。
ストリーミング結果
タスクは中間結果を返すようにストリームすることができる:
def streaming_task(context: Context) -> TaskOutput.
stream = context.get_stream()
for i in range(3): stream.put(StreamChunk("streaming_task"), f
stream.put(StreamChunk("streaming_task", f "interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"): print(f"{task_id}", streaming_task): return TaskOutput(output="final")
print(f"{task_id}: {output}")
ダイナミックなワークフロー
タスクは条件に基づいて動的にスケジューリングできる:
def conditional_task(context: Context) -> TaskOutput.
count = context.get("count", 0)
if count >= 3: return TaskOutput(output="done")
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f "iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # タスクは3ループ後に完了する。
入力パラメータ
タスクは入力パラメーターを受け取ることができる:
def parameterised_task(context: Context) -> TaskOutput.
name = context.get("ユーザー名")
return TaskOutput(output=f "Hello {name}!")
flow.add_task("greet", parameterised_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) #出力:{"greet": "こんにちはアリス!"}。
ダイナミックルーティング
タスクは入力に基づいて動的にルーティングできる:
def router(context: Context) -> TaskOutput.
task_type = context.get("タイプ")
routes = {
"process": [NextTask("process_task")], "analyse": [NextTask("analyse_task")
"analyse": [NextTask("analyse_task")]、
"レポート": [NextTask("report_task")], "レポート": [NextTask("report_task")
}
return TaskOutput(output=f "{task_type}へのルーティング", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # output: {"process_task": "processed data"}.
ステータス管理
タスクの状態は保存およびロードできる:
コンテキスト = コンテキスト()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"
マップリダクション
タスクはMap Reduceオペレーションを実行できる:
def task1(ctx).
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx).
collector = ctx.get("collector").
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("タスク1")
assert result == {"task3": ["result2", "result2", "result2"]}.
LLMエージェント
動的なツール選択のためのLLMエージェントの例:
from typing import リスト
インポート json
def llm_agent(context: Context) -> TaskOutput.
prompt = context.get("user_input")
llm_response = { { "reasoning": "データベースを検索する必要がある。
"reasoning": "データベースを検索し、結果をフォーマットする必要がある"、
「ツール": ["search_db", "format_results"].
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]] タスクアウトプットを返す。
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})