# Microservices in Java Spring: Nebenläufigkeitsprobleme lösen



## Standardw (15. Okt 2020)

Hallo liebe Gemeinde,
ich schreibe zur Zeit eine Studienarbeit und muss dazu ein bestehendes Microservice-System erweitern.

Das Microservice-System ist mittel Java Spring umgesetzt. In einem Service werden spezielle Nachrichten an Embedded Devices versendet. An welche Geräte konkret gesendet wird, entscheidet der Benutzer (eine einfache Liste an Geräten). Im System soll es ab jetzt dabei eine Vorgabe geben, sodass nicht alle Nachrichten gleichzeitig, sondern nur bestimmte Geräte in einer bestimmten Reihenfolge benachrichtigt werden (um eine Auslastung der Infrastruktur zu verhindern). Die Liste der abzuarbeitenden Geräte liegt in einer MongoDB.
Wird eine Nachricht gesendet, kommt eine Antwort "irgendwann" zurück, und wird vom System an den Service durchgeleitet. Bei einer zukünfitgen Skalierung des Systems kann es geschehen, dass diese Antwort von einer andere Instanz empfangen wird, als von derer, die die ursprüngliche Nachricht versendet hat. 

Da nun nicht alle Nachrichten auf einmal gesendet werden dürfen, muss nach der intialien Auswahl auch regelmäßig nachgeprüft werden, ob wieder Nachrichten verschickt werden dürfen. Meine Idee ist diese: Kommt eine Antwort an, wird geprüft, welche Geräte eine Nachricht erhalten werden. Somit ist die Auslastung maximal.

Das generelle Problem, welches ich dabei habe: Kommen zwei Antworten an zwei Instanzen "gleichzeitig" an, würden beide ggf. das gleiche Gerät zweimal benachrichtigen (nicht erlaubt) oder zu viele Geräte benachrichtigen (würde ggf. zur Überlastung führen). Ich dachte nun also daran, dass sobald eine Antwort eintrifft, eine Transaktion zu starten (d.h. eine komplette Methode dabei auszuführen). Dort wird der Algorithmus abgearbeitet, und bei den Geräten, die benachrichtigt werden dürfen, ein SCHEDULED-flag zu setzen. Kommt fast-parallel dazu eine andere Anwort im System an, wird der Prozess durch die laufende Transaktion des anderen Prozesses blockiert. Sobald er arbeiten darf, sieht er, dass manche Geräte schon auf SCHEDULED stehen und überspringt diese: Es werden keine doppelten Nachrichten versendet, alles gut.

Das eigentliche Problem: Bei einer großen Anzahl an Geräten, könnte es sein, dass viele Antworten (auf einmal) kommen. Dann würden auf einmal z B. 100 Prozesse entstehen, die warten, bis sie weitermachen dürfen. Im worst-case würde sich die Anzahl immer weiter aufstauen, und eine unnötig große Anzahl von Prozessen erschaffen werden. Wären alle Nachrichten versendet, stünden dann trotzdem noch hunderte Prozesse und würden komplett durchlaufen, auch wenn eigentlich kein Bedarf mehr besteht, da die Liste schon abgearbeitet wurde. Im kleinen mag das vielleicht gehen, aber bei zehn- oder sogar hunderttausenden Geräten ist das sicher kein sinnvoller Ansatz.

Die Lösung, einfach abbzubrechen, falls schon ein anderer Prozess läuft, ist auch nicht verlässlich: Kommt die Antwort des vorletzten Gerätes rein, und es läuft ein Prozess, wird einfach abgebrochen. Da der schon laufende Prozess aber noch denkt, dass das vorletzte Gerät nicht fertig ist, wird das letzte Gerät von ihm nicht benachrichtigt; dann ist der Prozess fertig, und weil keine Antwort mehr kommt, wird das letzte Gerät niemals benachrichtigt.

Sicher könnte ich mir etwas mit verschiedenen weitere Status-Flags zusammenfrickeln - ich bin aber auf der Suche nach einer "richtigen" Lösung. Leider hatte ich bisher noch keine Vorlesung zum Concurrent Programming. 

Hat jemand Ideen, Vorschläge oder Kritik? Ich suche keine fertige Lösung, sondern a) eine Bewertung meiner generellen Idee zur Lösung des Benachrichtigungshandling und b) eine Richtungsweisung um meine Probleme zu lösen (soweit sie nach a) noch relevant sind)

Liebe Grüße


----------



## LimDul (15. Okt 2020)

Ich verstehe gerade nicht, warum das Empfangen der Auslöser sein soll für das Senden? Warum sind die derartig eng gekoppelt, das nur gesendet wird, wenn was empfangen wird? Und warum muss das auch der gleiche Prozess sein? Wenn es darum geht, dass nur eine gewisse Zahl an Nachrichten gleichzeitig unterwegs sein darf, bietet sich dafür eine zentrale Instanz in Art einer Semaphore an. Aber die Prozesse zum Senden und zum empfangen sind komplette getrennte Thread-Pools. 

Sobald eine Nachricht empfangen wird, wird von dem empfangenden Thread die zentrale Semaphore informiert, dass wieder ein Slot frei ist. Daraufhin kann ein Thread aus dem Sende-Pool loslaufen. Aber die Abarbeitung des Empfang-Threads wird nicht blockiert.


----------



## Standardw (15. Okt 2020)

Die enge Kopplung kommt daher, dass ich versuche, die Begrenzung in den bisherigen Ablauf einzuklinken. Kommt eine Antwort, bedeutet das ja, dass Kapazitäten frei sind; also kann wieder geprüft werden. Dass es im gleichen Prozess ist, hat sich "so ergeben", allerdings dient das auch der Skalierung - dachte ich. Schließlich wird eine Antwort immer automatisch lastverteilt, und somit auch der anschließende Prüfvorgang.

Das System ist schon recht groß und in der Struktur mehr oder minder vorgegeben; außerdem bin ich zeitlich stark eingeschränkt und deshalb fällt es mir ehrlich gesagt schwer, in dieser kurzer in Zeit Java Spring, Microservices und Concurrent Programming durchzusteigen.

Du schlägst eine zentrale Instanz vor - hört sich für mich schon sinnig an, nur dachte ich eben, dass zentrale Stelle nicht skaliert werden kann. Außerdem kommt hinzu, dass ich im Rahmen von Microservices und Java Spring (Boot) nun nur die einzelnen Services kenne, die durch kubernetes orchestriert werden. Wäre diese zentrale Instanz nur ein weiterer Service, welcher dann im Endeffekt auf eine Instanz beschränkt ist?

Danke für deine Antwort!


----------



## LimDul (15. Okt 2020)

Du kommst ja nie ohne "zentrale" Instanzen aus, sobald du Daten hast. Eine DB ist (aus Sicht eines Services) immer eine zentrale Instanz - auch wenn sie darunter vielleicht repliziert ist.

Gleiches hier, du brauchst auf den ersten Blick einen Service der die Kapazität verwaltet. Ob man den jetzt skaliert oder nicht ist aus Sicht des Consumers erst mal egal. Der verwaltet nur die verwendete Kapazität. 

Was du versuchst widerspricht meines Erachtens der Mikroservice Architektur. Der empfangende Service ist bei dir auch gleichzeitig fürs Senden zuständig - das ist nicht mehr Mikro sondern am Ende in Monolith, der Lastverteilt ist.


----------



## Standardw (16. Okt 2020)

Ich habe nochmal darüber nachgedacht:


> Sobald eine Nachricht empfangen wird, wird von dem empfangenden Thread die zentrale Semaphore informiert, dass wieder ein Slot frei ist. Daraufhin kann ein Thread aus dem Sende-Pool loslaufen. Aber die Abarbeitung des Empfang-Threads wird nicht blockiert.


_Eigentlich _ist meine Idee genau das - ich habe vielleicht ein paar Details ausgelassen: Kommt eine Antwort rein, wird diese vom Kommunikations-Service an den Nachrichten-Service weitergeleitet. Dort soll die Prüfung durchgeführt werden (bisher wurden alle Nachrichten auf einmal gesendet, und die Antwort dort nur gespeichert, weiter nichts). Dürfen Nachrichten gesendet werden, werden die Nachrichten an den Kommunikations-Service weitergeleitet, der das konkrete Kommunikationsprotokoll implementiert und die Nachrichten dann an die Geräte sendet.

Entspräche das dem Konzept besser? Wie gesagt, diese Struktur besteht bereits - bis auf die Prüfung und das erneute Senden.
Der Nachrichten Service wäre dann ja die zentrale Prüfstelle. Diese kann dann ja auch skaliert werden.

-> Meine ursprüngliche Frage bezieht sich dann also auf diese Prüfstelle: Wie baue ich diese so, damit sie skalierungsfähig ist? Auch hier kann es ja passieren, dass zwei Nachrichten gleichzeitig reinkommen und somit 2x der Prüfvorgang gestartet wird. Generell kann ich den Prüfvorgang ja nicht wirklich parallelisieren, da beim Prüfen über alle Geräte-Statusmeldungen geschaut werden muss, und dann erst entschieden werden kann, welche Geräte eine Nachricht bekommen; das eigentliche Versenden ist dann natürlich parallel möglich.

Meine Frage lautet dann also möglicherweise: Wie konzeptioniere ich die zentrale Prüfstelle so, dass sie threadsafe und skalierungsfähig ist?


----------



## mihe7 (17. Okt 2020)

Irgendwie habe ich kein klares Bild des Gesamtsystems vor Augen:

Es gibt einen Service, der Nachrichten an Embedded Devices versendet. Die Antwort dieser Geräte werden durch ein ominöses "System" an "den" Service weitergeleitet. Wenn die Antwort ankommt - wo auch immer - werden wiederum Geräte benachrichtigt - worüber, weiß der Teufel. Dann darf die Infrastruktur nicht ausgelastet werden, was ich schon gar nicht verstehe, gleichzeitig gibt es einen Ansatz, bei dem die Auslastung maximal wird. Außerdem entstehen 100-te von Prozessen, die auf irgenwas warten müssen, ansonsten ist der Faktor Zeit irgendwo gar kein Thema... Ich blick da nicht durch


----------



## Hutzli (17. Okt 2020)

Hallo

Nur kurz für's Verständnis:
Du willst Nachrichten senden und zwar nur eine bestimmte Menge und die Microservice-Instanzen sind skalierbar?

In dem Fall:
Mache eine Tabelle "JobRunLog" in deiner MongoDB. Darin speicherst du den Jobname als ID (ev. RESPONSE_JOB) und ein Flag, ob dieser Job von einer Microservice-Instanz abgearbeitet wird.
Zusätzlich: eine Version-Column! Für Skalierung wichtig.

Danach machst du das Mapping in Java (keine Ahnung von Spring, ist wohl was mit Repos) und machst eine ResponseJob-Klasse. Dort machst du ne Methode, welche z.B. alle 10 Sekunde  aufgerufen wird (ein Job halt):
Scheduled

Dann läuft also alle 10 Sekunden ein Job an. Nun machst du eine Klasse ResponseService (beinhaltet Business-Logik) mit einer public-Methode (ev. sendResponses()). Dort sendest du halt deine Nachrichten .. kannst ja in der DB in einer Tabelle einen Counter führen, wieviele Nachrichten gesendet werden dürfen oder setzt ein Flag in deiner Embedded-Tabelle oder wie auch immer.

Letztes Problem: Skalierung
Dazu gibts das Version-Atrribut in der DB-Tabelle JobRunLog und das Flag.
Mache einen Service "JobService", der eine Methode 'lockJob(job: Job): boolean' hat. Achtung: diese Methode führt eine andere Methode 'lock(job: Job): boolean' in einer neuen Transaktion aus!
@Transactional (für Spring gibts das sicher auch, beachte aber, dass du diese Methode nicht einfach normal aufrufen kannst, da noch ein Proxy drumumgebaut werden muss.)


```
public class JobService {
    public boolean lockJob(Job job) {
        try {
            return lock(job); // hier speziell aufrufen, nicht so wie ichs mache!
        } catch(Exception ex) {
            return false;
        }
    }

    public void unlockJob(Job job) {
        JobRunLog entity = entityManager.find(JobRunLog.class, job.name());
    entity.setRunning(false);
    }
    
// Diese Annotation für Spring googlen
@Transactional(TxType.REQUIRES_NEW)
    private boolean lock(Job job) {
        JobRunLog entity = entityManager.find(JobRunLog.class, job.name());
    if (! entity.isRunning()){
        entity.setRunning(true); // wirft exception, wenn Version nicht mehr Neustes, dann läuft eine
// andere MS-Instanz, die zur selben Zeit versucht zu locken.
        return true;
    }
    return false;
    } 
}
```
In der ResponseJob Klasse probierst du den Job zu locken. Wenn das geht, dann kannst du deine Nachrichten senden und am Ende das Lock freigeben (running flag auf false setzen).
Wenn du nicht locken kannst, dann läuft bereits ein anderer Job, der Nachrichten verschickt und du machst einfach nichts:


```
@Scheduled(...)
public void job() {
    boolean locked = jobService.lockJob(Job.RESPONSE_JOB);
    if(locked) {
        service.sendResponses();
        jobService.unlockJob(Job.RESPONSE_JOOB);
    }
}
```

Ich hoffe, das gibt dir einen besseren Ansatz. Damit wäre Senden/Empfangen getrennt und das Senden ist skalierbar über mehrere Pods/Container hinweg.


----------

