NIS2 & DORA Compliance sichern – Secure Java Code Upgrade

Abstrakte digitale Grafik mit bunten Linien, Knotenpunkten und geometrischen Formen in Blau- und Grüntönen.
  • Software-Entwicklung

Asynchrone, event-driven Workflows mit Jakarta EE

Wie orchestrierst du mehrstufige Use Cases ohne enge Kopplung? Wir bauen mit Jakarta EE einen asynchronen, event-driven Workflow: n+1 Queues, StateTransition, JPA-Entities für den Fortschritt – plus Code-Beispiele, die du sofort einsetzen kannst.

Asynchrone, event-driven Workflows mit Jakarta EE

Für ein Kundenprojekt mussten wir einen asynchronen Arbeitsablauf mit mehreren Schritten implementieren. Man kann sich die Anwendung wie in der folgenden Grafik vorstellen. Es gibt 2 Hauptpfade, hier in Blau bzw. Grün dargestellt. Beim Start des asynchronen Prozesses kann gewählt werden, welcher Pfad genommen wird. Tritt in diesem Beispiel bei einem Arbeitsschritt des blauen Pfades ein Fehler auf, wird als Fallback auf den grünen Pfad ausgewichen.

 

 

Für den Endnutzer der Anwendung sind bei der Applikation 2 Funktionen wichtig:

  1. Senden von Daten, welche im asynchronen Prozess abgearbeitet werden
  2. Abfrage in welchem Arbeitsschritt sich die in Schritt 1 gesendeten Daten gerade befinden 

 

Herausforderungen in asynchronen Jakarta-EE-Workflows

Um einen möglichst modularen Ansatz für die beschriebene Art der Applikation zu schaffen, gibt es folgende Problemstellungen die es zu lösen gilt:

  1. Wie werden die Aufträge entgegengenommen?
  2. Wie wird der asynchrone Prozess gestartet?
  3. Wie gehen die Aufträge von einem Arbeitsschritt zum nächsten über?
  4. Wie ermöglicht man es den Nutzern der Anwendung, den Status der aus Punkt 1 übertragenen Aufträge jederzeit (auch während des asynchronen Prozesses) abzufragen?

 

Architektur & Lösungsansatz: JMS-Queues und State Transition

Um die Lösung besser veranschaulichen zu können, werden die einzelnen Elemente anhand eines Minimalbeispiels erklärt (das vollständige Minimalbeispiel findet sich auf GitHub). In diesem wird der in der Einleitung beschriebene Workflow umgesetzt. 

Um den Fokus auf dem asynchronen Workflow zu behalten, machen die einzelnen Arbeitsschritte nichts anderes, außer den Auftrag zum jeweils nächsten Arbeitsschritt weiterzuschieben. Die folgende Abbildung zeigt  eine etwas detailliertere Darstellung des Workflows im Minimalbeispiel.

 

 

Jeder Arbeitsschritt liefert dabei das Ergebnis OK oder ERROR. Anhand dieses Ergebnisses wird entschieden, welcher der nächste Arbeitsschritt (in weiterer Folge auch Use-Case) ist, der ausgeführt werden muss. Initial kann der Nutzer auch auswählen, ob die Abarbeitung in Pfad 1 oder direkt in Pfad 2 begonnen werden soll. 

Datenmodell für Workflow-Status (JPA)

Um die Ergebnisse der Use-Cases zu speichern und für weitere Verarbeitung zugänglich zu machen, werden sowohl der Auftrag des Nutzers und auch die Ergebnisse in einer Datenbank gespeichert. Die folgende Abbildung zeigt das Datenmodell, welches sowohl im Code (als JPA-Entities) als auch in der Datenbank für das Minimalbeispiel verwendet wird. 

FlowExecution und PredefinedResult repräsentieren dabei den Auftrag des Nutzers, während FlowState die einzelnen Arbeitsschritte und deren Ergebnis, bzw. aktuellen Status im Workflow darstellen.

 

 

Die folgende Tabelle beschreibt die einzelnen Felder des Datenmodells:

 

Die angegebenen Enumerations InitialPath und FlowAction repräsentieren in diesem Kontext einfach die Use-Cases bzw. die beiden Pfade aus dem Beispiel. 

Das Result-Enum repräsentiert die Zustände welche ein Use-Case gerade haben kann:

  • UNPROCESSED → Auftrag wurde im zugehörigen Use-Case noch nicht bearbeitet oder ist gerade in Bearbeitung
  • OK → Auftrag wurde im Use-Case bearbeitet und mit Ergebnis OK abgeschlossen
  • ERROR → Auftrag wurde im Use-Case bearbeitet und mit Ergebnis ERROR abgeschlossen

 

Asynchrone Abarbeitung mit JMS-Queues & Routing

Wie funktioniert nun die asynchrone Abarbeitung des Workflows? 

Wie aus dem Titel herauszulesen ist, wird dafür ein Eevent basierter Ansatz gewählt. Im konkreten Fall wird dafür eine Message Queue verwendet, um die Aufträge innerhalb des Workflows weiterzureichen. 

Insgesamt gibt es n+1 verschiedene Queues, wobei n die Anzahl der Use-Cases ist. Es gibt also für jeden Use-Case eine eigene Queue. Die zusätzliche Queue wird dafür verwendet, um die Weiterreichung innerhalb des Workflows zu managen.

In der folgenden Abbildung ist die Verbindung zwischen den Use-Cases, der StateTransition-Implementierung und den Message Queues dargestellt.

 

 

Ist ein Use Case mit seiner Arbeit fertig, schickt er eine Message in die State Transition Queue. Diese Message wird dann von der StateTransition-Implementierung aufgegriffen und diese kümmert sich dann darum, dass der Auftrag an den nächsten Use-Case (je nach Ergebnis des vorigen Use-Cases)  weitergereicht wird, indem eine Message auf die entsprechende Queue gelegt wird.

Wenn ein Auftrag von der Applikation initial entgegengenommen wird, wird ebenfalls in die State Transition Queue für die StateTransition eine Message gesendet. D.h. die StateTransition-Implementierung kümmert sich auch um die Auswahl des ersten Use-Cases im Workflow.

Die folgende Abbildung zeigt anhand eines Beispiels, wie das Zusammenspiel zwischen den einzelnen Applikationsteilen und den Message Queues bei der Abarbeitung eines Workflows funktioniert.

Hier ist auch ersichtlich, dass die StateTransition-Implementierung immer auch am Ende einer asynchronen Abarbeitung aufgerufen wird. Wird dabei durch das Ergebnis des letzten Use Cases festgestellt, dass der Workflow zu Ende ist, wird keine Message an eine der anderen Queues geschickt. Dadurch, dass für den Auftrag nun keine Messages mehr im Umlauf sind, ist auch der asynchrone Ablauf zu Ende.

 

 

Persistenz & Messageformat: JPA-Entities, JMS ObjectMessage

Was wird nun von den einzelnen Use-Cases bzw. der StateTransition-Implementierung an die Message Queue übergeben und wie ist es für die Nutzer der Applikation möglich den Fortschritt des gesendeten Auftrags im asynchronen Ablauf abzufragen?

Es gibt vermutlich mehrere Arten, wie dieses Problem gelöst werden kann. Im Falle der Beispielapplikation wurde es folgendermaßen gelöst:

  • Alle Informationen, welche den ursprünglichen Auftrag (und auch evtl. durch Use-Cases ergänzte Informationen) betreffen, werden in der Datenbank gespeichert.
  • In den Queues wird nur die technische ID der Root-Entity (im Beispiel FlowExecution) weitergereicht. Das bedeutet, die Messages enthalten nur einen einzigen Wert.
  • Die Use-Cases holen sich alle für ihre Abarbeitung notwendigen Informationen aus der Datenbank und speichern auch das Ergebnis dorthin zurück
  • Die StateTransition-Implementierung arbeitet ebenso und bekommt alle notwendigen Informationen zur Entscheidung über den nächsten Arbeitsschritt aus der Datenbank (im Beispiel ist die Entscheidungsgrundlage immer der letzte Use-Case und dessen Ergebnis)
  • Dadurch, dass alle Informationen in der Datenbank gespeichert sind, ist auch zu jedem Zeitpunkt eine Abfrage über den derzeitigen Fortschritt des Workflows möglich.

 

Implementierung mit Jakarta EE (WildFly/ActiveMQ, JSF/REST)

 

Auftragsannahme & Workflow-Start (Persist + Queue)

Für die Annahme eines Auftrags wurde ein eigenes Service implementiert, welche ein fertiges FlowExecution-Objekt übergeben bekommt und danach genau 2 Arbeitsschritte macht

  1. Persistierung des FlowExecution-Objekts
  2. Versenden  einer Message mit der technischen ID der FlowExecution an die State Transition Message Queue

 

public Long persistAndStartFlowPath(FlowExecution flowPath) {
 FlowExecution persistedFlowExecution = 
 				flowExecutionDao.persistFlowExecution(flowPath);
 eventPublisher.sendEvent(persistedFlowExecution.getId(), "STATETRANSITION");
 return persistedFlowExecution.getId();
}

 

Dem Service an sich ist es egal, von wo aus er aufgerufen wird. D.h. es ist auch möglich, mehrere verschiedene Möglichkeiten wie bspw. eine Frontend-Maske oder eine REST-Schnittstelle zu implementieren. Im Code-Beispiel auf GitHub ist es eine einfache JSF-Seite mit einem Controller, welche den Service aufruft.

Events erstellen & senden: JMSProducer und Queue

Zur Erstellung der Events und dem Versenden einer Message an die Message Queue gibt es wiederum eine eigene Implementierung, welche in weiterer Folge von allen Use-Cases, der StateTransition-Implementierung und auch dem Service zum Erstellen neuer Aufträge verwendet wird.

Die Kernaufgaben sind:

  1. Erstellen der JMS-Message
  2. Senden der Message an die entsprechende Queue

 

public void sendEvent(Long flowExecutionId, String queueName) {
 try {
   final ObjectMessage message = jmsContext.createObjectMessage();
   message.setObject(flowExecutionId);


   final JMSProducer jmsProducer = jmsContext.createProducer();
   jmsProducer.send(getQueue(queueName), message);
 } catch (JMSException e) { /* Handle Exception */ }
}

Message-Verarbeitung mit MDBs (@MessageDriven)

Wie schon erwähnt, gibt es n+1 (wobei n die Anzahl der verschiedenen Arbeitsschritte ist) verschiedene Queues. Für jede Queue (also für jeden Use-Case und die StateTransition-Implementierung) muss es jeweils einen Mechanismus geben, der erkennt, dass etwas zu tun ist.

Hierfür werden Message Driven Beans verwendet. Jeweils eine Implementierung für jeden Use-Case und eine für die State Transition. Durch korrekte Annotierung, die Implementierung des Interfaces MessageListener und der entsprechenden onMessage-Methode wird somit der Ablauf durch Befüllen und Entnahme von Messages auf den  Queues gesteuert. Auch eine parallele Abarbeitung mehrerer Aufträge wird dadurch erreicht.

Im folgenden Codeblock ist beispielhaft die Implementierung für das StateTransitionMDB dargestellt.

 

@MessageDriven(name = "STATE_TRANSITION_MDB", activationConfig = {
   @ActivationConfigProperty(propertyName = "destinationLookup",
                       	  propertyValue = "queue/STATETRANSITION"),
   @ActivationConfigProperty(propertyName = "destinationType", 
 				  propertyValue = "jakarta.jms.Queue"),
   @ActivationConfigProperty(propertyName = "acknowledgeMode",
 				  propertyValue = "Auto-acknowledge")})
public class StateTransitionMDB implements MessageListener {


 private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass().getName());


 @Inject
 private StateTransition stateTransition;


 public void onMessage(Message rcvMessage) {
   try {
     if (rcvMessage instanceof ObjectMessage) {
       Long flowExecutionId = (Long) ((ObjectMessage) rcvMessage).getObject();
       logger.info("Received message with flowExecutionId: " + flowExecutionId);


       stateTransition.execute(flowExecutionId);
     }
   } catch (JMSException e) {
     throw new RuntimeException(e);
   }
 }
}

State Transition: nächsten Use Case bestimmen

Statusübergänge werden gemacht, indem die StateTransition-Implementierung sich das Ergebnis des letzten ausgeführten Use-Cases aus der Datenbank holt und daraus den nächsten Use-Case berechnet, der im Workflow vorgesehen ist.

Hier kann man dann noch 2 Fälle unterscheiden: Es gibt einen nächsten Use-Case oder die Abarbeitung ist zu Ende.

Im Falle, dass kein nächster Use-Case notwendig ist, macht die Implementierung hier nichts weiter und erzeugt keine weiteren Events. D.h. die Abarbeitung des Auftrags ist hier zu Ende.

Wird allerdings festgestellt, dass ein weiterer Use-Case im Workflow notwendig ist, wird ein neues FlowState-Objekt zum Auftrag hinzugefügt. Dieses neue Objekt muss als Result immer den Wert UNPROCESSED haben. Dadurch wird der Applikation (und möglichen Statusabfragen) mitgeteilt, dass der nächste Use-Case vorgesehen, aber noch nicht fertig abgearbeitet wurde.

Zusätzlich zum Erstellen und Persistieren des neuen FlowState-Objekts wird eine Event-Message auf die entsprechende Queue für den nächsten Use-Case gelegt.

Im Beispiel sind die möglichen Statusübergänge in einer Map hinterlegt. Es wäre allerdings möglich, diese Map durch eine komplexere Methode (falls notwendig) zu ersetzen.

Der folgende CodeBlock zeigt den Hauptteil der StateTransition-Implementierung:

 

public void execute(Long flowExecutionId) {
 FlowExecution flowExecution = flowExecutionDao.getFlowExecutionById(flowExecutionId);


 FlowState nextState = calculateNextFlowState(flowExecution);


 if (Objects.nonNull(nextState)) {
   nextState.setFlowExecution(flowExecution);
   flowExecution.getFlowStates().add(nextState);
   flowExecutionDao.updateFlowExecution(flowExecution);


   eventPublisher.sendEvent(flowExecution.getId(), 
 			     nextState.getAction().getQueueName());
 }
}

 

Use Cases: Ausführen, Ergebnis (OK/ERROR) & Event-Dispatch

Die Use-Cases haben folgende Aufgaben:

  1. Ausführen von Aktionen die für den jeweiligen Arbeitsschritt notwendig sind
  2. Berechnen eines Ergebnisses (im Beispiel OK oder ERROR)
  3. Speichern des Ergebnisses in der Datenbank
  4. Senden eines Events an die State Transition Queue.

Punkt 1 ist in der Beispielanwendung für alle Use-Cases gleich: Es wird 5s gewartet, um einen Arbeitsschritt zu simulieren.

Punkt 2 funktioniert in der Beispielanwendung so, dass beim Senden des Auftrags bereits mitgegeben wird, welches Ergebnis im entsprechenden Arbeitsschritt erzeugt werden soll. In einem Echtfall würde sich das Ergebnis evtl. aus den Operationen in Punkt 1 ergeben.

So  könnte eine entsprechende Implementierung aussehen:

 

public void execute(Long flowExecutionId) {
 getLogger().info("Executing use case for flow execution with id " + flowExecutionId);


 FlowExecution flowExecution = flowExecutionDao.getFlowExecutionById(flowExecutionId);


 performAction(flowExecution);


 FlowState lastState = ListUtils.getMax(flowExecution.getFlowStates(), 
 			Comparator.comparing(FlowState::getId));
 lastState.setResult(calculateUseCaseResult(flowExecution, lastState));
 lastState.setTimestamp(LocalDateTime.now());


 flowExecutionDao.updateFlowExecution(flowExecution);
 eventPublisher.sendEvent(flowExecutionId, "STATETRANSITION");
 getLogger().info("Use case for flow execution with id " + flowExecutionId + " 
 			executed");
}

 

Vorteile: Entkopplung, Skalierbarkeit & Wartbarkeit

Durch das Verwenden einer Message Queue und das Navigieren durch den Workflow mit Events lassen sich die einzelnen Use-Cases gut voneinander entkoppeln.

Auch wenn im Beispiel die interne ActiveMQ eines Wildfly-Servers verwendet wird und ein einzelnes war-File, welches die komplette Applikation beinhaltet, deployed wird, wäre es möglich, mit demselben Ansatz alle Use-Cases einzeln voneinander zu deployen. 

Auch in unserem Beispiel sprechen die einzelnen Use-Cases nur über die MessageQueue bzw. auch die Datenbank miteinander.

Durch diese Entkopplung wird es auch wesentlich einfacher, den Workflow im Code abzubilden, da jeder Use-Case für sich betrachtet werden kann. Sollte sich also an einem Arbeitsschritt etwas ändern, muss nur dieser angepasst werden und nicht der komplette Workflow.

Auch das Hinzufügen neuer Arbeitsschritte, oder eine Änderung der Reihenfolge dieser ist ohne Änderung der Arbeitsschritte möglich. Dazu muss nur die StateTransition-Implementierung erweitert werden. 

Durch die Verwendung von Message Queues, Message Driven Beans und dem Abspeichern der Status in der Datenbank lässt sich sehr einfach ein asynchroner Workflow in Gang setzen, der einfach adaptierbar, erweiterbar und somit auch gut wartbar ist. 

Möchtest du deinen Use-Case als PoC auf die Schiene bringen?

Wir klären Zielbild, Randbedingungen und Systemkontext, skizzieren den Event-Flow (Queues, State Transition, DB-Status-Tracking), definieren Erfolgskriterien und den PoC-Scope mit priorisiertem Backlog. Du erhältst eine prägnante Architektur-Skizze, eine realistische Aufwandsschätzung und einen klaren Fahrplan bis zum Prototyp. Optional: NDA und Code-Snippets für den schnellen Start.

geschrieben von:
Christian Salmhofer
WordPress Cookie Plugin von Real Cookie Banner