Follow my new blog

Dienstag, 7. September 2010

Die Power von Rx für Event-Based Components

Ob Event-Based Components (EBC) auch mit den Reactive Extensions, dem Rx Framework von Microsoft implementiert werden könnten, stand als Frage schon länger im Raum. Jetzt hab ich mich mal daran versucht.

Als Beispiel habe ich die FizzBuzz Kata gewählt. Der EBC-Entwurf ist denkbar simpel:

image

Bisher wurde dann eine EBC-Aktion wie Zahl transformieren so übersetzt:

   1: class Zahl_transformieren_früher
   2: {
   3:     public void In_Zahl(int zahl)
   4:     {
   5:         ...
   6:     }
   7:  
   8:     public event Action<string> Out_Symbol;
   9: }

Output-Pins von EBC-Aktionen waren Events, Input-Pins Methoden. Das ist eine geradlinige Übersetzung. Funktioniert in der Praxis tadellos.

Und doch bleibt etwas zu wünschen übrig. Es gibt einfach keine Unterstützung für Operationen auf Folgen von Events, also Nachrichtenströme. Natürlich kann man Aktionen wie Filter in einen Draht “einklemmen” – doch die müssen relativ umständlich erst entwickelt werden.

Mit Rx geht das jedoch viel einfacher. Damit kann man einen Event-Strom filtern oder verzögern oder zwei Ströme zusammenführen usw. usf. Viele schöne Operationen auf Events – oder eben Observables – gibt es dort. Warum das nicht für EBC nutzen?

Deshalb hier mein Vorschlag für eine Übersetzung von EBC-Flows mit Rx-Mitteln:

* Ein Output-Pin wird in ein IObservable<T> übersetzt
* Ein Input-Pin wird in einen IObserver<T> übersetzt

Mit diesen Interfaces kann man dann wunderschöne Sachen machen.

Aber es sind eben nur Interfaces und so brauchen wir noch Implementationen. Ich nenne die mal OutputPin<T> und InputPin<T>. Ist doch irgendwie naheliegend, oder?


   1: class Zahl_transformieren
   2: {
   3:     public InputPin<int> In_Zahl { get; private set; }
   4:     public OutputPin<string> Out_Symbol { get; private set; }
   5:  
   6:     public Zahl_transformieren()
   7:     {
   8:         this.In_Zahl = new InputPin<int>(Transform);
   9:         this.Out_Symbol = new OutputPin<string>();
  10:     }
  11:  
  12:  
  13:     private void Transform(int zahl)
  14:     {
  15:         if (IsFizzBuzz(zahl))
  16:             this.Out_Symbol.Post("fizzbuzz");
  17:         ...
  18:     }

Um die eigentlichen Transformationsroutine Transform() wird nun ein IObserver<T> gewickelt in Form eines InputPin<T>. Und die Ergebnisse werden an einen OutputPin<T> geschickt, der ein IObservable<T> ist. Den können dann folgende Aktionen abonnieren.

Die Zahlengenerierung sieht analog aus. Kein Hexenwerk. Beide Aktionen verdrahte ich dann noch in einer Aktionsgruppe zu einem Flow, um die Details der Verarbeitung gegenüber einem Client zu verbergen:


   1: public class FizzBuzzer
   2: {
   3:     public InputPin<Unit> In_Generate { get; private set; }
   4:     public OutputPin<string> Out_Symbole { get; private set; }
   5:  
   6:  
   7:     public FizzBuzzer()
   8:     {
   9:         var ze = new Zahlen_erzeugen();
  10:         var zt = new Zahl_transformieren();
  11:  
  12:         this.In_Generate = ze.In_Generate;
  13:         ze.Out_Zahl.Subscribe(zt.In_Zahl);
  14:         this.Out_Symbole = zt.Out_Symbol;
  15:     }
  16: }

Dem Input-Pin der Aktionsgruppe kann der Input-Pin von Zahlen erzeugen zugewiesen werden. Sie braucht also keinen eigenen. Dito für den Output-Pin der Aktionsgruppe.

Und die konsumierende Aktion abonniert die produzierende Aktion, s. Zeile 13. Das entspricht der bisherigen Zuweisung einer Input-Pin Methode als Eventhandler an einen Output-Pin Event.

Formal ist der Unterschied zwischen bisheriger Übersetzung und der nach Rx minimal, finde ich. Das setzt sich dann bis zum Client fort, der die Aktionsgruppe nutzt:


   1: var fb = new FizzBuzzer();
   2:  
   3: fb.Out_Symbole.Subscribe(Console.WriteLine);
   4: fb.In_Generate.OnNext(new Unit());

Ein kleinwenig gewöhnungsbedürftig mag allerdings sein, dass Rx keine Kommunikation ohne Wert kennt. Deshalb muss die Zahlengenerierung mit einem Parameter angestoßen werden. Dankenswerterweise gibt es dafür Unit, sozusagen einen “wertlosen” Typ. Er ist der Funktionalen Programmierung entlehnt und findet sich auch in F# wieder.

Und jetzt zum Gewinn durch eine Umstellung der Übersetzung von EBC mit Rx:


   1: fb.Out_Symbole
   2:   .Where(s => s[0] < '@')
   3:   .Select(s => int.Parse(s))
   4:   .SkipWhile(i => i < 42)
   5:   .Subscribe(Console.WriteLine);

Wir können ganz leicht in den “Eventfluss” eingreifen. Es gibt viele Standardoperationen dafür, allen voran die bekannten und beliebten Linq-Operatoren.

Wie klingt das? Ich finde das sehr vielversprechend. Damit experimentiere ich weiter. Mein Gefühl ist, dass mit Rx die Übersetzung noch systematischer wird und gleichzeitig die Möglichkeiten zum Umgang mit Event-Strömen wachsen. Also ein doppelter Gewinn.

13 Kommentare:

Anonym hat gesagt…

I only can read english, however, I have been doing precisely the same experiments of and on for about 4 months with the RX framework. I think it will be a better solution than using CCR and it works in Silverlight.

Janek Fellien hat gesagt…

Hallo Ralf,

die Sache sieht schon recht gut aus. Gerades das "Subsrcibe" gefällt mir.

Womit ich ein wenig Schwierigkeiten hatte, ist die implizite Syntax in deinem Text. Einige Leute werden mit Aktionsgruppe, Aktion und Flow noch nichts anfangen können. Vielleicht könntest du dazu noch eine kleine Erklärung schreiben.

Ansonsten entspricht das Konzept dem Gedanken, den wir vor Wochen schon entwickelten, dass Input- und OutputPin "sinnvolle" Objekte sein sollten. Das vereinfacht das Verständnis. Schade nur, dass hier die Events auch wieder das Nachsehen haben und der Begriff EBC wieder einmal in Frage gestellt wird.

Also werden wir doch ABC-Schützen ;-)


Jan

Ralf Westphal - One Man Think Tank hat gesagt…

@Jan: Jo, die Events sind nun weg :-) Es lebe der Stream. Denn nichts anderes ist das, was da aus einem IObservable rauskommt.

Die Terminologie mit Action und Flow etc. hab ich einfach mal so benutzt. Auch um zu sehen, wie sie mir gefällt.

Rausgekommen ist für mich: schon ok - aber es fehlt noch was. Die Begriffe gehören dazu. Aber allermindestens fehlt ein Oberbegriff für Action und Action Group. Denn die sind weiterhin von außen nicht zu unterscheiden.

Component geht aus mehreren Gründen nicht mehr. Also: Wie heißen diese Dinger ganz allgemein, die Pins haben. (Von den Pins komm ich irgendwie nicht weg.)

Janek Fellien hat gesagt…

Ja Ja, so ist das mit den Pins, man bleibt einfach dran hängen. Ich bin auch gerade dabei dein Beispiel nach zu empfinden und die Pins sind auch bei mir noch da.

Ich verwende allerdings zum Ausprobieren die in der Google Group angesprochene Struktur und lege mir eine Bibliothek von Actions an, kombiniere die Actions in Activities und verwende diese Activities in der Komponente. Bisher habe ich nicht das ungute Gefühl, für Action und Activity etwas Übergeordnetes zu finden.
Ich meine, dass Action oder Activity von aussen nicht erkennbar sein müssen, da wir immer vom Prozess oder Flow reden, der aus Aktionen besteht.

Wofür brauchst du diesen Begriff?

Ralf Westphal - One Man Think Tank hat gesagt…

@Jan: Actions zusammenfassen zu Activities (oder Action Groups): das ist ok.

Aber eben weil Actions und Activities von außen nicht zu unterscheiden sind, ist ein übergeordneter Begriff nötig.

Analogie: In einem Unternehmen arbeiten Buchhalter und Manager und Pförtner und viele andere. Es ist gut, die mit spezifischen Berufs/Positionsbezeichnungen unterscheiden zu können.

Aber es ist genauso wichtig, sie zusammenfassen zu können. Buchhalter wie Manager und Pförtner sind Mitarbeiter oder Angestellte.

Wenn man eine allgemeine Aussage treffen will, dann sagt man z.B. "Unsere Angestellten sind zufrieden." Das ist simpler als "Unsere Buchhalter und Manager und Pförtner sind zufrieden." Und es ist korrekter als "Unsere Buchhalter sind zufrieden."

Deine Formulierung "vom Flow reden, der aus Aktionen besteht" ist nämlich falsch. Ein Flow besteht eben nicht nur aus Aktionen (in der derzeitigen Terminologie), sondern aus Aktionen und Aktivitäten. Einfacher wäre es zu sagen: "Ein Flow besteht aus XXX." Aber was ist XXX?

Von mir aus ist XXX "Aktion". Aber dann müssen die atomaren XXX eine andere Bezeichnung bekommen.

Janek Fellien hat gesagt…

Hallo Ralf,

gut, ich verstehe, du benötigst es, um die Dinge zusammenzufassen. Dann würde ich im Deutschen die 'Tätigkeit' vorschlagen. Im Englischen müsste man dann das Wort 'Task' verwenden.

Tätigkeit ist der allgemeine Begriff für Aktivität. Sie lässt sich delegieren und ist zielgerichtet, sprich es steht immer eine Erfüllung einer Aufgabe dahinter.

Vielleicht hilft das ja weiter.

Jan

Sebastian hat gesagt…

Hallo Ralf,

interessanter Artikel! Was mich noch interessieren würde: Wie hast du die
OutputPin und InputPin implementiert?

Viele Grüße

Sebastian Betzin

Ralf Westphal - One Man Think Tank hat gesagt…

@Sebastian: Input und Output Pins beschreibe ich mal in der dotnetpro ;-)

Aber wer nicht warten will, der kann hier in Sourcen schauen: http://bit.ly/cgzbuQ

Janek Fellien hat gesagt…

Hallo Sebastian,

ich hatte auch erst diese Frage im Sinn, bis ich mir Rx mal angeschaut habe und zu folgender extrem vereinfachten Implementierung gekommen bin:

a.) Post- und Subscribe-Methode am OutputPin:

public IDisposable Subscribe(IObserver observer) {
this._observer = observer;
return null;
}

Ja, return null ist gar nicht gut und eigentlich sollte man eine Liste von Observern anlegen, aber zum Probieren funktioniert es erst einmal.

public void Post(TOutput output) {
this._observer.OnNext(output);
}


b.) ctor Injection für das Action Mapping

private Action _triggeraction;

public Trigger(Action triggerAction) {
this._triggeraction = triggerAction;
}

Es macht dabei keinen Unterschied ob nun Trigger oder InputPin als Name des Pins verwendet wird ;-)


und c.) Aufruf von OnNext
public void OnNext(TMessage value) {
this._triggeraction(value);
}

Meine Pins sind generisch, damit ich die Typen der Messages steuern kann. Alle anderen Dinge, die noch implementiert werden müssten, habe ich ert einma links liegen gelassen.

Janek Fellien hat gesagt…

Ja gut Ralf, deine Version ist sicherlich korrekter...

Danke für den Link

Jan

Sebastian Betzin hat gesagt…

Ok, war gar nicht so schwer. Hier ist meine Version:

Public Class OutPutPin(Of T)
Implements IObservable(Of T)
Implements IDisposable

Private m_Observer As IObserver(Of T)
Private m_Disposed As Boolean

Public Sub New()
End Sub

Public Function Subscribe(ByVal observer As IObserver(Of T)) As IDisposable Implements IObservable(Of T).Subscribe
m_Observer = observer

Return Me
End Function

Public Sub Post(ByVal Value As T)
m_Observer.OnNext(Value)
End Sub

#Region "IDisposable Support"
Protected Overridable Sub Dispose(ByVal disposing As Boolean)
If Not m_Disposed Then
If disposing Then
m_Observer = Nothing
End If
End If
m_Disposed = True
End Sub

Public Sub Dispose() Implements IDisposable.Dispose
Dispose(True)
GC.SuppressFinalize(Me)
End Sub
#End Region

End Class

Rainer hat gesagt…

Langsam gefällt mir das ganze. Action und Flow finde ich gut.

Inputs und Outputs habe ich noch nicht in Frage gestellt. Was ich bisher störend finde ist Wiring!

Hier machen es andere Programmiersprachen nach meinem Bauchgefühl leicher. Siehe F# und Computation Expressions: http://msdn.microsoft.com/en-us/library/dd233182.aspx
Das ist aber nur mein Eindruck, da ich mich an EBCs noch nicht ausgetobt habe.

Das Konzept scheint schlüssig, die Umsetzung gefällt mir persönlich noch nicht. Bin gespannt, was ihr euch da in Zukunft noch einfallen lasst.

Anonym hat gesagt…

Hallo Ralf

Hatte ausnahmsweise etwas Zeit, mir Deine Beiträge zum Thema EBCs zu Gemüte zu führen und mit den verschiedenen Ansätzen herumzuspielen.

Dieser Ansatz hier mit den Input- und Output-Pin Klassen gefällt mir am besten, mit den Events konnte ich mich irgendwie nicht so recht anfreunden, da habe ich immer so ein ungutes Gefühl von wegen Memory-Leaks.

Da mich die Parameter vom Type "Unit" gestört haben, implementierte ich noch eine dritte Infrastrukturklasse namens FirePin (mir gingen noch die Namen TriggerPin und StarterPin durch den Kopf, bin noch nicht schlüssig, was der beste Namen ist, und auch nicht, ob es gut ist, noch einen Pin-Typ mehr einzuführen oder ob ich mich selber mal anpassen sollte anstatt den Code...).

Egal, hier ist er:


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ebc.infrastructure {

public class FirePin : InputPin {

//Constructors

public FirePin(Action aProcessor)
: base(new ProcessorAdapter(aProcessor).OnAction) {
}

public FirePin(Action aProcessor)
: base(aProcessor) {
}

public FirePin(IObserver anObserver)
: base(anObserver) {
}

//Public Methods

public void Fire() {
OnNext(new Unit());
}

//*********************************************************************
// INNER CLASS: ProcessorAdapter
//*********************************************************************

private class ProcessorAdapter {

//Private Fields
private Action _Action;

//Constructors

public ProcessorAdapter(Action aProcessor) {
_Action = aProcessor;
}

//Public Methods

public void OnAction(Unit aUnit) {
_Action.Invoke();
}

}

}

}

Danke + Gruess
Christoph Hafner