AIパーソナル・ラーニング
と実践的なガイダンス

Flow(Laminar):タスクを簡素化し、柔軟に管理するインテリジェンス構築のための軽量タスクエンジン

この記事は2024-12-04 18:42に更新されました。内容の一部は一刻を争うものですので、無効な場合はメッセージを残してください!

はじめに

Flowは、シンプルさと柔軟性を重視してAIエージェントを構築するために設計された軽量タスクエンジンです。従来のノードベースやエッジベースのワークフローとは異なり、Flowは並列実行、動的スケジューリング、インテリジェントな依存関係管理をサポートする動的タスクキューシステムを採用している。Flowのコアコンセプトは、並列タスク実行、動的ワークフロー、条件分岐制御により、複雑なワークフローをシンプルかつ簡単にすることです。Flowは、ノード間の事前定義されたエッジを必要とせず、動的タスクスケジューリングアーキテクチャを採用することで、開発者がよりクリーンで理解しやすいコードを書くことができます。

 

機能一覧

  • 並列タスク実行: 明示的なスレッドコードなしでタスクを自動的に並列実行します。
  • 動的スケジューリング:タスクは実行時に新しいタスクをスケジューリングできる。
  • インテリジェントな依存関係管理:タスクは前の操作の結果を待つことができる。
  • 状態管理:特定のタスクの実行を起点としたタスク状態の保存と読み込み。
  • 条件分岐と制御フロー:条件分岐とループ制御がサポートされている。
  • ストリーミング・タスク実行:タスク実行のストリーミングをサポート。
  • 自動トレース:OpenTelemetryの自動トレースをサポートし、デバッグと状態の再構築を容易にします。
  • 軽量で外部依存なし:設計はシンプルで柔軟かつ強力です。

 

ヘルプの使用

設置プロセス

Flowをインストールするには、pipコマンドを使うだけだ:

pip install lmnr-flow

基本的な使い方

以下はその簡単な使用例である:

from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# Flowインスタンスを作成する
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# 単純なタスクを定義する
def my_task(context: Context) -> TaskOutput.
return TaskOutput(output="Hello World!")
#タスクをFlowに追加する
flow.add_task("greet", my_task)
# タスクを実行する
result = flow.run("greet")
print(result) #出力:{"greet": "ハローワールド!"}。

ミッションチェーン

クエストは他のクエストを誘発することができる:

def task1(context: Context) -> TaskOutput.
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput.
t1_result = context.get("task1")
return TaskOutput(output="result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # output: {"task2": "result2"}.

並列実行

複数のタスクを並行して実行できる:

def starter(context: Context) -> TaskOutput.
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # 2つのタスクが並行して実行され、トータルにかかる時間は約1秒である。

ストリーミング結果

タスクは中間結果を返すようにストリームすることができる:

def streaming_task(context: Context) -> TaskOutput.
stream = context.get_stream()
for i in range(3): stream.put(StreamChunk("streaming_task"), f
stream.put(StreamChunk("streaming_task", f "interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"): print(f"{task_id}", streaming_task): return TaskOutput(output="final")
print(f"{task_id}: {output}")

ダイナミックなワークフロー

タスクは条件に基づいて動的にスケジューリングできる:

def conditional_task(context: Context) -> TaskOutput.
count = context.get("count", 0)
if count >= 3: return TaskOutput(output="done")
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(output=f "iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("conditional_task", conditional_task)
flow.run("conditional_task") # タスクは3ループ後に完了する。

入力パラメータ

タスクは入力パラメーターを受け取ることができる:

def parameterised_task(context: Context) -> TaskOutput.
name = context.get("ユーザー名")
return TaskOutput(output=f "Hello {name}!")
flow.add_task("greet", parameterised_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) #出力:{"greet": "こんにちはアリス!"}。

ダイナミックルーティング

タスクは入力に基づいて動的にルーティングできる:

def router(context: Context) -> TaskOutput.
task_type = context.get("タイプ")
routes = {
"process": [NextTask("process_task")], "analyse": [NextTask("analyse_task")
"analyse": [NextTask("analyse_task")]、
"レポート": [NextTask("report_task")], "レポート": [NextTask("report_task")
}
return TaskOutput(output=f "{task_type}へのルーティング", next_tasks=routes.get(task_type, []))
flow.add_task("router", router)
flow.add_task("process_task", lambda ctx: TaskOutput("processed data"))
flow.run("router", inputs={"type": "process"}) # output: {"process_task": "processed data"}.

ステータス管理

タスクの状態は保存およびロードできる:

コンテキスト = コンテキスト()
context.from_dict({"task1": "result1"})
flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
assert flow.context.get("task1") == "result1"
assert flow.context.get("task2") == "result2"

マップリダクション

タスクはMap Reduceオペレーションを実行できる:

def task1(ctx).
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx).
collector = ctx.get("collector").
return TaskOutput(collector)
flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.add_task("task3", task3)
result = flow.run("タスク1")
assert result == {"task3": ["result2", "result2", "result2"]}.

LLMエージェント

動的なツール選択のためのLLMエージェントの例:

from typing import リスト
インポート json
def llm_agent(context: Context) -> TaskOutput.
prompt = context.get("user_input")
llm_response = { { "reasoning": "データベースを検索する必要がある。
"reasoning": "データベースを検索し、結果をフォーマットする必要がある"、
「ツール": ["search_db", "format_results"].
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]] タスクアウトプットを返す。
return TaskOutput(output="LLM agent response", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Find data"})

AIイージー・ラーニング

AIを始めるための素人ガイド

AIツールの活用方法を、低コスト・ゼロベースから学ぶことができます。AIはオフィスソフトと同様、誰にとっても必須のスキルです。 AIをマスターすれば、就職活動で有利になり、今後の仕事や勉強の労力も半減します。

詳細を見る
無断転載を禁じます:チーフAIシェアリングサークル " Flow(Laminar):タスクを簡素化し、柔軟に管理するインテリジェンス構築のための軽量タスクエンジン

チーフAIシェアリングサークル

チーフAIシェアリングサークルは、AI学習に焦点を当て、包括的なAI学習コンテンツ、AIツール、実践指導を提供しています。私たちの目標は、高品質のコンテンツと実践的な経験の共有を通じて、ユーザーがAI技術を習得し、AIの無限の可能性を一緒に探求することです。AI初心者でも上級者でも、知識を得てスキルを向上させ、イノベーションを実現するための理想的な場所です。

お問い合わせ
ja日本語