AI Personal Learning
und praktische Anleitung

Flow (Laminar): eine leichtgewichtige Task-Engine für den Aufbau von Intelligenz, die Aufgaben vereinfacht und flexibel verwaltet

Allgemeine Einführung

Flow ist eine leichtgewichtige Task-Engine, die für die Entwicklung von KI-Agenten entwickelt wurde, wobei der Schwerpunkt auf Einfachheit und Flexibilität liegt. Im Gegensatz zu traditionellen knoten- und kantenbasierten Workflows verwendet Flow ein dynamisches Aufgabenwarteschlangensystem, das parallele Ausführung, dynamische Planung und intelligentes Abhängigkeitsmanagement unterstützt. Flow benötigt keine vordefinierten Kanten zwischen Knoten und verwendet eine dynamische Task-Scheduling-Architektur, um Entwicklern zu helfen, sauberen, leicht verständlichen Code zu schreiben. Flow wird vom Laminar-Team gewartet, unterstützt automatisches Tracing und Zustandsmanagement und ist für eine Vielzahl von KI-Anwendungsszenarien geeignet.

 

Funktionsliste

  • Parallele Task-Ausführung: Automatische parallele Ausführung von Tasks ohne expliziten Threading-Code.
  • Dynamische Planung: Aufgaben können zur Laufzeit neue Aufgaben planen.
  • Intelligentes Abhängigkeitsmanagement: Aufgaben können auf das Ergebnis eines vorangegangenen Vorgangs warten.
  • Zustandsverwaltung: Speichern und Laden der Zustände von Aufgaben, beginnend mit der Ausführung einer bestimmten Aufgabe.
  • Bedingte Verzweigungen und Kontrollfluss: Bedingte Verzweigungen und Schleifensteuerung werden unterstützt.
  • Streaming Task Execution: Unterstützt das Streaming der Task-Ausführung.
  • Automatisiertes Tracing: Unterstützt das automatisierte Tracing von OpenTelemetry zur einfachen Fehlersuche und Zustandsrekonstruktion.
  • Geringes Gewicht und keine externen Abhängigkeiten: Das Design ist einfach, flexibel und leistungsstark.

 

Hilfe verwenden

Ablauf der Installation

Um Flow zu installieren, verwenden Sie einfach den Befehl pip:

pip install lmnr-flow

Grundlegende Verwendung

Nachfolgend finden Sie ein einfaches Beispiel für seine Verwendung:

from concurrent.futures importieren ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput, NextTask, Context
# Erstellen einer Flow-Instanz
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
# Definieren Sie eine einfache Aufgabe
def my_task(context: Context) -> TaskOutput.
return TaskOutput(output="Hallo Welt!")
# Hinzufügen einer Aufgabe zu Flow
flow.add_task("grüßen", my_task)
# Ausführen der Aufgabe
result = flow.run("grüßen")
print(result) # Ausgabe: {"greet": "Hallo Welt!"}

Missionskette

Quests können andere Quests auslösen:

def task1(context: Context) -> TaskOutput.
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
def task2(context: Context) -> TaskOutput.
t1_result = context.get("aufgabe1")
return TaskOutput(output="result2")
flow.add_task("aufgabe1", aufgabe1)
flow.add_task("aufgabe2", aufgabe2)
flow.run("task1") # output: {"task2": "result2"}

parallele Ausführung

Mehrere Aufgaben können parallel ausgeführt werden:

def starter(context: Context) -> TaskOutput.
return TaskOutput(output="gestartet", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
def slow_task1(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="result1")
def slow_task2(context: Context) -> TaskOutput.
time.sleep(1)
return TaskOutput(output="ergebnis2")
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter") # Die beiden Aufgaben werden parallel ausgeführt, und die Gesamtzeit beträgt etwa 1 Sekunde.

Streaming-Ergebnisse

Aufgaben können gestreamt werden, um Zwischenergebnisse zu liefern:

def streaming_task(context: Context) -> TaskOutput.
stream = context.get_stream()
for i in range(3): stream.put(StreamChunk("streaming_task"), f
stream.put(StreamChunk("streaming_task", f "interim_{i}"))
return TaskOutput(output="final")
flow.add_task("streaming_task", streaming_task)
for task_id, output in flow.stream("streaming_task"): print(f"{task_id}", streaming_task")
print(f"{Aufgaben_id}: {Ausgabe}")

Dynamischer Arbeitsablauf

Aufgaben können auf der Grundlage von Bedingungen dynamisch geplant werden:

def conditional_task(context: Context) -> TaskOutput.
count = context.get("count", 0)
if count >= 3: return TaskOutput(output="done")
return TaskOutput(ausgabe="erledigt")
context.set("count", count + 1)
return TaskOutput(output=f "iteration_{count}", next_tasks=[NextTask("conditional_task")])
flow.add_task("bedingte_Aufgabe", bedingte_Aufgabe)
flow.run("conditional_task") # Die Aufgabe ist nach 3 Schleifen abgeschlossen

Eingabeparameter

Aufgaben können Eingabeparameter erhalten:

def parameterised_task(context: Context) -> TaskOutput.
name = context.get("user_name")
return TaskOutput(output=f "Hallo {Name}!")
flow.add_task("grüßen", parameterised_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result) # output: {"greet": "Hallo Alice!"}

dynamisches Routing

Aufgaben können auf der Grundlage von Eingaben dynamisch weitergeleitet werden:

def router(context: Context) -> TaskOutput.
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")], "analyse": [NextTask("analyse_task")
"analyse": [NextTask("analyse_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f "routing to {task_type}", next_tasks=routes.get(task_type, []))
flow.add_task("Router", Router)
flow.add_task("process_task", lambda ctx: TaskOutput("verarbeitete Daten"))
flow.run("router", inputs={"type": "process"}) # output: {"process_task": "processed data"}

Statusverwaltung

Aufgabenstatus können gespeichert und geladen werden:

Kontext = Kontext()
context.from_dict({"Aufgabe1": "Ergebnis1"})
flow = Flow(kontext=Kontext)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("aufgabe2")
assert flow.context.get("aufgabe1") == "ergebnis1"
assert flow.context.get("aufgabe2") == "ergebnis2"

Map Reduce

Tasks können Map-Reduce-Operationen durchführen:

def task1(ctx).
ctx.set("collector", [])
return TaskOutput("result1", next_tasks=[NextTask("task2", spawn_another=True) for _ in range(3)])
def task2(ctx):
collector = ctx.get("collector")
collector.append("ergebnis2")
ctx.set("collector", collector)
return TaskOutput("", next_tasks=[NextTask("task3")])
def task3(ctx).
Sammler = ctx.get("Sammler")
return TaskOutput(collector)
flow.add_task("aufgabe1", aufgabe1)
flow.add_task("aufgabe2", aufgabe2)
flow.add_task("aufgabe3", aufgabe3)
ergebnis = flow.run("aufgabe1")
assert result == {"task3": ["result2", "result2", "result2"]}

LLM-Agent

Beispiel für einen LLM-Agenten für die dynamische Werkzeugauswahl:

from typing import List
import json
def llm_agent(context: Context) -> TaskOutput.
prompt = context.get("user_input")
llm_response = {
"reasoning": "Muss Datenbank durchsuchen und Ergebnisse formatieren",
"tools": ["search_db", "format_results"]
}
next_tasks = [NextTask(tool) for tool in llm_response["tools"]]
return TaskOutput(output="LLM-Agent Antwort", next_tasks=next_tasks)
flow.add_task("llm_agent", llm_agent)
flow.run("llm_agent", inputs={"user_input": "Daten finden"})

Darf nicht ohne Genehmigung vervielfältigt werden:Chef-KI-Austauschkreis " Flow (Laminar): eine leichtgewichtige Task-Engine für den Aufbau von Intelligenz, die Aufgaben vereinfacht und flexibel verwaltet

Chef-KI-Austauschkreis

Der Chief AI Sharing Circle konzentriert sich auf das KI-Lernen und bietet umfassende KI-Lerninhalte, KI-Tools und praktische Anleitungen. Unser Ziel ist es, den Nutzern dabei zu helfen, die KI-Technologie zu beherrschen und gemeinsam das unbegrenzte Potenzial der KI durch hochwertige Inhalte und den Austausch praktischer Erfahrungen zu erkunden. Egal, ob Sie ein KI-Anfänger oder ein erfahrener Experte sind, dies ist der ideale Ort für Sie, um Wissen zu erwerben, Ihre Fähigkeiten zu verbessern und Innovationen zu verwirklichen.

Kontaktieren Sie uns
de_DE_formalDeutsch (Sie)