Montag, 3. September 2012

Der Fortschritt in Datenflüssen

Datenflüsse sehen einfach aus, sind es auch in gewisser Weise – dennoch haben sie es in sich. Da habe ich neulich gemerkt, als ein Programm, das ich mit Flow-Design locker entworfen hatte, sich dann doch nicht so verhielt, wie erwartet. Es hat funktioniert, das war kein Problem. Aber sein Output war irgendwie, hm, strange.

Deshalb will ich hier einmal die Frage beleuchten, wie denn eigentlich die Verarbeitung in Datenflüssen fortschreitet. Oder besser: Welche unterschiedlichen Fortschrittsweisen kann es denn geben?

Ein Beispielszenario soll die unterschiedlichen Fortschrittsweisen vergleichbar machen. Es ist ganz einfach:

image

Nachrichten, die von einer Funktionseinheit verarbeitet werden, bezeichne ich mit deren Namen: a(x) kommt “von draußen” und wird von a() verarbeitet, b() erzeugt c(y), die von c() verarbeitet wird usw. Nachrichten tragen also das Ziel und nicht die Quelle im Namen. Für das Beispiel reicht das als Identifikation.

Ausgehen von einer Nachricht a() erzeugen die Funktionseinheiten nun diese Nachrichten:

  • a(1)
    • b(11)
      • c(111)
        • e(1111)
        • e(1112)
      • d(111)
      • c(112)
        • e(1121)
    • b(12)
      • d(121)
      • d(122)
      • c(121)
    • b(13)
      • c(131)
        • e(1311)
        • e(1312)
      • d(131)
      • c(132)
        • e(1321)
      • d(132)
      • d(133)
      • c(133)
        • e(1331)
        • e(1332)
      • d(134)

Dieser Baum beschreibt Input-Output-Zusammenhänge, z.B. Input b(12) an b() führt zum Output d(121), d(122) und c(121). Das bedeutet auch, das c(121) nach d(122) erzeugt wird.

Allerdings steckt keine Aussage darüber in dem Baum, wann die Nachrichten verarbeitet werden. Verarbeitet c() die Nachricht c(132) während b() noch weiteren Output generiert oder erst nachdem d(134) ausgegeben wurde?

Darauf geben die Fortschrittsweisen Antwort.

Depth-first

Die naheliegende Vorstellung vom Fortschritt der Verarbeitungsweise des Flows ist wohl, dass jede Nachricht sofort verarbeitet wird. Auf einen Zeitstrahl aufgetragen sieht das so aus:

image

Hier spiegelt sich der Baum direkt wider. So würde die Verarbeitung auch verlaufen, wenn der Datenfluss als Call Stack interpretiert würde, also die Funktionseinheiten sich geschachtelt aufriefen.

imageEine Schachtelung im Sinne einer Servicehierarchie soll ja aber gerade mit Flow-Design vermieden werden. Die Funktionseinheiten a()..e() sollen sich nicht kennen; c() soll nicht von e() abhängen, b() nicht von c() und d(), a() nicht von b().

Alternativ kann diese Fortschrittweise aber auch mit Event-Based Components (EBC) erzielt werden: Wenn a() mit b(11) einen Event feuert, dann arbeitet b() als Event-Handler den erst ab, bevor a() dazu kommt, b(12) auszugeben. Wenn b() währenddessen c(111) erzeugt, arbeitet c() die Nachricht erst ab, bevor b() d(111) erzeugt usw.

Alle Nachrichten werden also erstens sequenziell verarbeitet, d.h. nacheinander und auch noch streng in der Reihenfolge, in der sie erzeugt wurden. Und zweitens werden sie synchron verarbeitet, d.h. während der Verarbeitung wartet die erzeugende Funktionseinheit.

Die synchron-sequenzielle Verarbeitungsweise geht depth-first vor. Die Verarbeitung jeder Nachricht fließt sofort soweit nach rechts durch wie möglich.

Breadth-first

Ist der depth-first Fortschritt der richtige, der beste, der einzig wahre Fortschritt für Datenflüsse? Ich glaube nicht. Er mag der naheliegendste sein, doch das scheint mir für eine Bewertung zu wenig. Denn warum soll es richtiger sein als etwas anderes, Nachrichten sofort in der Tiefe zu verarbeiten und dabei Quellfunktionseinheiten darauf warten zu lassen?

Ich denke, zunächst einmal gleichberechtigt ist die breadth-first Verarbeitung von Nachrichten:

image

Die hervorgehobenen Nachrichten zeigen den Unterschied gegenüber dem depth-first Fortschritt. a() erzeugt die Nachrichten b(11), b(12) und b(13) und die werden zuerst komplett abgearbeitet, bevor deren Output dran kommt. Und auch der wird zuerst abgearbeitet, bevor sein Output dran kommt usw. Die am weitesten rechts stehende Funktionseinheit e() erhält also hier als letzte Arbeit, weil sie am tiefsten im Baum liegt als der der Fluss angesehen werden kann, wenn man ihn um 90°dreht.

Die Verarbeitung eilt damit nicht mehr bei jeder Nachricht zum Ende der Verarbeitung, sondern schreitet sequenziell über alle “Flussarme” hinweg fort. Ich stelle mir das als eine breite Welle vor.

image

Vorteil von depth-first ist, dass nach Anstoß eines Flows schnell erste Ergebnisse am Ende heraustropfen. Das bedeutet aber nicht, dass die Verarbeitung von weit vorangeschritten ist. Bei breadth-first hingegen können Sie sicher sein, dass Arbeitsschritte abgeschlossen sind, wenn ihre Ergebnisse verarbeitet werden.

Das fühlt sich für mich mehr nach Datenfluss an: Ein Input kommt bei einer Funktionseinheit “auf dem Tisch”, wird verarbeitet, dabei wird Output erzeugt – und wenn das alles fertig ist, dann geht es bei der nächsten Funktionseinheit weiter.

Zumindest empfinde ich das als “fluss-mäßiger”, wenn ich die Verarbeitung als synchron denke. Weder findet Verarbeitung auf mehreren Threads innerhalb einer Funktionseinheit statt, noch arbeiten mehrere Funktionseinheiten parallel. Wenn Funktionseinheiten ihre Arbeit abschließen können, bevor ihr Output verarbeitet wird, dann sind sie auch hübsch unabhängig von einander.

Synchrone Verarbeitung ist für mich der default beim Flow-Design. Sowohl Flow-Design Implementierungen mit EBC wie mit der Flow Runtime folgen dem auch – auch wenn sie sich in der synchronen Verarbeitung unterscheiden, wie Sie hier sehen.

Dennoch war ich damit nicht zufrieden. Denn dieser breadth-first Fortschritt hat sich in der eingangs erwähnten Anwendung als etwas merkwürdig angefühlt. Warum?

Solange am Anfang eines Flusses nur eine Nachricht steht, macht breadth-first kein Problem. Dann läuft die große Welle langsam in die Tiefe.

Falls auf a(1) jedoch noch a(2), a(3) usw. folgen und a(i) nicht komplett in der Tiefe verarbeitet ist, bevor a(i+1) eintrifft, kann es zum Stau kommen [1]. Es geht dann zwar alles ganz gerecht zu im Sinne sequenzieller Verarbeitung. Doch solche Gerechtigkeit ist nicht in allen Fällen wünschenswert. Manchmal wäre es gut, wenn Nachrichten einander überholen könnten – zumindest wenn sie in unterschiedlichen Flussarmen fließen. Warum muss ein d(2…) auf ein e(1…) zwangsläufig warten?

Round-robin

Angesichts des merkwürdigen Verhaltens der Anwendung habe ich einen Mittelweg zwischen depth-first und breath-first Verarbeitung gesucht. Eingefallen ist mir eine round-robin Verarbeitung von Nachrichten. Um das zu verstehen, hier die grundsätzliche Arbeitsweise der Flow Runtime:

image

Nachrichten kommen von außen zur Flow Runtime, die sie asynchron verarbeitet. Nachrichten, die bei der Verarbeitung entstehen, fließen entweder hinaus, weil sie Endergebnisse darstellen – oder sie fließen zurück in die Runtime, um von folgenden Funktionseinheiten verarbeitet zu werden. a(i) ist eine Nachricht, die von außen zur Runtime kommt. b(i) usw. sind Nachrichten, die die Runtime quasi an sich selbst schickt.

Jede Nachricht, die bei der Runtime eintrifft, wird auf deren einzigem Thread abgearbeitet; das symbolisiert der Kreis in der Funktionseinheit. Sie ist insofern autonom gegenüber ihrer Umwelt.

Immer wenn eine Funktionseinheit eine Nachricht verarbeitet hat, schaut die Runtime nach, ob weitere Nachrichten zur Verarbeitung anliegen. Die stehen in einer Queue, über die die Runtime in einer Schleife läuft. Der Inhalt dieser Queue sieht für das Beispiel über die Zeit so aus (von links wird angehängt, von rechts entnommen):

image

Das erklärt die breadth-first Verarbeitung: Jede Funktionseinheit wird für eine Nachricht abgearbeitet und stellt ihren Output ans Ende der Queue. Der kommt dann erst dran, wenn der Output vorheriger Funktionseinheiten verarbeitet wurde.

Dieses System habe ich nun aufgebrochen, indem nun jede Funktionseinheit eine eigene Queue besitzt:

image

Über diese vielen Queues läuft nun die Flow Runtime im round-robin Verfahren. Das bedeutet, für jede Nachricht geht sie eine Queue weiter. Es entsteht folgendes Muster:

image

Sie sehen, die Verarbeitung wird Nachricht für Nachricht gleichmäßig über die Funktionseinheiten verteilt. Die Verarbeitungsreihenfolge hat im Grunde nichts mehr mit der Tiefe einer Funktionseinheit im Fluss zu tun. Wo Output erzeugt wird, da wird er auch abgearbeitet.

Käme nun ein a(2) zwischendurch an, so würde es alsbald zur Verarbeitung gebracht, wenn seine Queue an der Reihe ist. Es müsste nicht warten, bis alles, was vorher schon aufgelaufen war, abgearbeitet ist.

Dieses Verfahren scheint mir noch gerechter als breadth-first. Es hat allerdings eine Besonderheit, derer man sich bewusst sein muss: Aufs Ganze betrachtet, erfolgt die Abarbeitung der Nachrichten nicht mehr notwendig streng in Erzeugungsreihenfolge. Nachrichten können einander überholen: e(1112) wird zum Beispiel vor b(13) verarbeitet.

Bei depth-first Fortschritt ist b(13) noch nicht erzeugt, wenn e(1112) abgearbeitet wird. Bei breadth-first wurde b(13) erzeugt und schon abgearbeitet lange vor e(1112). Bei round-robin jedoch steht b(13) noch unverarbeitet in der b()-Queue, während e(1112) schon in Arbeit ist.

Asynchron im Kreis

Auch bei round-robin findet innerhalb der Flow Runtime noch keine Parallelverarbeitung statt. Trotzdem geht es überall voran, sobald die Runtime Gelegenheit hat, eine Nachricht zu verarbeiten. Das ist kein pre-emptive Multitasking, weil ja jede Funktionseinheit so lange an einer Nachricht herumlaborieren darf, wie sie mag. Insgesamt auf den ganzen Fluss gesehen, fühlt es sich dennoch so an, als würde quasi parallel gearbeitet.

Richtig ernst wird das, wenn einzelne Funktionseinheiten asynchron arbeiten. Dann kann der Output während ihrer Laufzeit von der Runtime schon weiterverarbeitet werden.

imageBeispielhaft setze ich mal b() auf asynchrone Verarbeitung, damit Sie sehen, wie sich das Muster dann verändern könnte. Die Länge der Nachrichtenkästen soll nun die Verarbeitungsdauer andeuten.

 

image

Jetzt kommt es natürlich auch darauf an, wann b() Output erzeugt. c(111) wird parallel abgearbeitet und erzeugt e(1111). Währenddessen arbeitet b() weiter! Wann fließt dort aber d(111) heraus? Während c() am Werk ist und e(1111) generiert oder erst später? Denn danach richtet sich, ob auf c() unmittelbar e() folgt wie im Bild oder zuerst d().

Fazit

Die Verarbeitung in Datenflüssen ist anders als die in Servicehierarchien. Anders, doch deshalb nicht schlechter. Sie müssen sich umgewöhnen. Das mag schwer fallen, weil die “Stack-Denke” so tief in uns allen drin steckt. Doch ich meine immer noch, dass sich das lohnt.

Denn anders bedeutet hier chancenreich. So strange das Verhalten der eingangs erwähnten Anwendung war, es hat mich wieder beeindruckt, wie leicht die Flow-Operationen zu testen waren, weil sie unabhängig von einander sind. Und es war ganz leicht, individuell für jede zu entscheiden, ob sie synchron oder asynchron laufen soll.

Und letztlich finde ich es auch gut, überhaupt die Wahl zu haben zwischen Verarbeitungsweisen. Die könnte eine Runtime womöglich sogar zur Auswahl anbieten. Sogar den depth-first Fortschritt hatte ich schon einmal implementiert.

Fußnoten

[1] Dass weitere Nachrichten vor Abarbeitung beim Fluss eintreffen, setzt natürlich bei aller Synchronizität seiner Funktionseinheiten voraus, dass der Fluss als Ganzes gegenüber seiner Umwelt asynchron arbeitet. Das ist bei der Flow Runtime der Fall.

5 Kommentare:

Mike Bild hat gesagt…

Sehr interessant und riecht ein bisschen nach einem Event-Loop wie bei Dispatch-Queues, den Rx-EventLoopScheduler oder auch nach dem Prozessmodell von NodeJS oder nginx.

Spannend ist natürlich Wie und Wo einer möglicher Synchronisierungspunkt in der nonblocking Ausführung erfolgt und ob diese Art der Ausführung möglicherweise nur bei Resourcenintensiven "IO Kontakt" bzw. "Side-Effects" nötig ist. Ohne "IO Kontakt" ist IMHO stack-based völlig ausreichend.

Schön wäre hier eine Designhilfe bzw. Entscheidung "per Default" wie auch Dein "alles ist synchron/single threaded" in einem Framework.

BG - Mike

Ralf Westphal - One Man Think Tank hat gesagt…

@Mike: Naja, das kann man halt kaum anders machen. Über eine Queue werden Sender und Empfänger entkoppelt. Dadurch entsteht die Asynchronizität.

So arbeitet zum einen die Flow Runtime im Ganzen gegenüber ihrer Umwelt. Aber so arbeiten auch async bzw. parallel operations innerhalb eines Flows in der Runtime.

Seit dem Blogartikel ist die Runtime aber schon weiter. Man kann jetzt die Verarbeitungsweise (das Scheduling) einstellen. Es gibt die im Artikel beschriebenen Modi breadth-first und round-robin.

Darüber hinaus kann die Runtime nun aber auch depth-first arbeiten.

Alle diese Modi sind async gegenüber dem Aufrufer.

Zusätzlich gibt es den Modus sync-depth-first, d.h. die Runtime ist komplett sync zum Aufrufer.

In Flows kann natürlich weiterhin jede Operation async/parallel geschaltet werden.

Für IO muss ich mir mal was überlegen. Entweder ist das unterhalb des Radars der Runtime. Wenn eine Operation IO macht, der async laufen kann, dann muss sie halt zusehen, wie sie das hinkriegt.

Oder ich bohre die Runtime Config auf, dass man da async Methoden (nach dem Pattern des .NET Fx) als Operationen registrieren kann.

Letzteres wäre irgendwie cool. Aber andererseits fühlt es sich so an, dass dann die Operationen seeehr feingranular würde. Da weiß ich nicht, ob das so schlau wäre.

Mike Bild hat gesagt…

@Queues: Jepp, passt!

@Scheduler-Model: Finde ich überzeugend.

@Konzeptionelle Ausrichtung:

Ich finde den Ansatz eines Computation-Frameworks/Runtime schon irgendwie passend. Denn das ist es aus meinem jetzigen Leserblickwinkel erstmal. IO/Side Effect Handling müsste, wie in Node.js auch, dann auch elementarer Teil der Runtime sein. Node.js löst das ja mit entsprechenden Modulen für nonblocking File-, Network-, usw. Access. Es steht mir natürlich frei eigene nonblocking Module zu entwerfen. Der "default" ist aber, verwende unsere Module und konstruiere Deine nonblocking computation mit Deiner atomaren stack based - Das Model muss so und transformiert werden - Logik. Nett wäre sicher eine Art Actor Model bzw. Petri-Net Runtime die mir IO/Shared-State Access per Design abnimmt. Träume in .NET ;-) BG - Mike

herbivore hat gesagt…

Hallo Ralf,

ich finde dieses spezielle Thema sehr spannend. Mich hat schon immer die MessageQueue in Windows fasziniert und ich habe schon lange überlegt, ob man das Prinzip für eigene Programm(architekturen) nutzen kann - leider ohne praktisches Ergebnis. Nun kann ich aber wenigstens etwas, dass ich aus den Überlegungen gelernt habe, hier anbringen. Dass man nicht nur eine globale Strategie braucht, sondern regelmäßig lokal entscheiden können muss, ob die folgende Operation synchron (als direkter Aufruf) oder asynchron (als Auftrag in die Queue) erfolgen soll. Genau so, wie man diese Entscheidung bei Windows mit SendMessage und PostMessage treffen kann (siehe dazu auch "[FAQ] Bestimmte Aktionen bis nach der laufenden GUI-Event-Behandlung verzögern", http://www.mycsharp.de/wbb2/thread.php?threadid=103960). Denn nur durch die Verwendung von SendMessage kann man sicherstellen, dass alle Nachrichten, die z.B. zu einem Fokuswechsel gehören, hintereinander weg verarbeitet werden, selbst wenn schon neue Benutzereingaben in der Queue warten. Würden einfach *alle* Nachrichten *immer* in die Queue(s) gestellt werden, würde es eine unübersichtliche und Fehler verursachende Verzahnung über mehrere Benutzereingaben hinweg geben. Das trifft natürlich vor allem dann zu, wenn die einzelnen Operationen (globale) Statusveränderungen durchführen, z.B. ein Form aktualisieren oder Ausgaben in eine Datei schreiben oder Datenbankänderungen durchführen.

herbivore

Ralf Westphal - One Man Think Tank hat gesagt…

@herbivore: Du kannst für jede Operation in einem Flow entscheiden, ob sie sync, async oder parallel laufen soll, wenn du die Runtime benutzt.

Kommentar veröffentlichen

Hinweis: Nur ein Mitglied dieses Blogs kann Kommentare posten.