# Parsen von selbstdefinierten Messages



## gest01 (11. Nov 2007)

Hi

Hab da ein kleines Problem.
Arbeite im Moment an einem Client/Server-Programm. Für die Kommunikation habe ich ein eigenes Protokoll entwickelt (über TCP/IP) das sehr ähnlich dem HTTP ist. Dazu habe ich einen Satz von Message-Klassen definiert um die einzelnen Kommandos sauber zu kapseln. Diese Messages werden dann zwischen den Clients und dem Server ausgetauscht bzw. gesendet ;-). Aus den Messages erzeuge ich dann ein Byte-Array und sende diesen Array über den OutputStream.

Die Messages sind wie folgt aufgebaut:

* | ClientID (1 Byte) | Type (1 Byte) | Length (4 Byte) | Data (N Bytes) | *

Dies funktioniert soweit so gut. Das Problem beginnt bei hoher Last, wenn viele Clients viele Messages senden. Auf der Server-Seite (und natürlich auch auf der Client-Seite) lese ich die Bytes aus dem Socket mit 
	
	
	
	





```
inputstream.read()
```
. Das heisst, zuerst wird nur der Header (Total 6 Bytes) ausgelesen danach anhand des
*Length-Feldes* die Daten-Bytes. Ich muss bei dieser Methode davon ausgehen, dass ich immer auch wirklich den Header auslese und nicht sonstigen Datenmüll. Dieses Vorgehen ist nicht gerade gut.

Stattdessen wäre es besser den Messages eine Anfangs- (Präambel) und evtl eine Endekennung mit zugeben, damit ich weiss wo es beginnt und endet. So, nun die Fragen   

1. Wie sollte so eine Präambel aufgebaut sein? (Das Problem ist ja, das im Data-Field jedes mögliche Byte stehen kann)
2. Welche Art von StreamReadern muss ich verwenden für den InputStream und OutputStream des Sockets? Buffered?
3. Hat jemand schon mal was ähnliches gemacht, bzw. evtl sogar ein Beispiel


Danke für eure Hilfe

Gruss


----------



## anfänger15 (11. Nov 2007)

Hier ein chatbeispielwww.java-forum.org/de/viewtopic.php?t=6033

Wenn es nur um reine Text-Messages geht würde ich dir einen BufferedReader empfehlen und dort mit readLine immer eine Zeile einlesen, die die information enthält wer die Message gesendet hat und mit einem Trennzeichen von der eigentlichen Message trennt (sowie es im chat gemacht wird). 

Wenn es jedoch um binärdaten geht dann vergiss alles was ich geschrieben hab


----------



## gest01 (11. Nov 2007)

Danke für die Antwort.

Es handelt sich ausschliesslich um Binärdaten.


----------



## seejay (12. Nov 2007)

ich weiß nicht ob es dir hilft, aber es gibt ein Bit-Stuffing. Am Anfang und am Ende jeder Nachricht wird dieses Stück eingefügt
01111110
und dazwischen werden nach 5x1er immer eine 0 eingefügt, damit diese Sequenz nicht zufällig einmal im Text vorkommen kann
das heißt aus
111111 -> 1111101
111110 -> 1111100
etc..
Vllt kannste des gebrauchen


----------



## gest01 (12. Nov 2007)

Vielen Dank für den Hinweis.

Werde mich mal schlau machen. 
Hat den hier noch nie jemand ein eigenes Protokoll implementiert?
Bzw. Wie wird das denn bei TCP gemacht?

Gruss


----------



## tuxedo (12. Nov 2007)

Also ich weiß vom Protokoll des Lineage2 MMORPG-Serverprojekts, dass das dort ähnlich gemacht wird. Ein Start und Stop-Flag wird dort AFAIk nicht benutzt. 

Du könntest aber eine Prüfsumme des kompletten pakets errechnen (crc, md5, ...) und mit anhängen. Dann kannst du Anhand der Prüfsumme vergleichen ob die Daten mit der Prüfsumme übereinstimmen.


----------



## gest01 (12. Nov 2007)

Oder anders gefragt:
Wie würdet Ihr Messages zwischen Server und Client austauschen?


----------



## tuxedo (12. Nov 2007)

So wie du es beschrieben hast. Hab damit ein eigenes VoIP-Protokoll aufgesetzt. Funktioniert prima. TCP kümmert sich schon recht gut drum dass die Pakete auch ja ankommen.

- Alex


----------



## gest01 (12. Nov 2007)

Wie versendest Du bzw. empfängst Du die Pakete?
Welche Stream-Typen verwendest Du?


----------



## tuxedo (12. Nov 2007)

Die verwendete Stream-Klasse ist "wurscht". Ganz unten liegt so oder so "InputStream" und "OutputStream". Darauf baust du halt mit dem auf, was du brauchst. Willst recht einfach ints, shorts, etc. versenden, setzt du nen DataOutputStream auf den OutputStream auf. Um die latenzen niedrig zu halten (bei VoIP sehr wichtig) und meinen Codec nicht "durcheinander" zu bringen, verwende ich keinen Buffered*Stream.

Wie ist die Frage "Wie versendest du?" gemeint?????

Na ich schreibe auf den OutputStream drauf. Und schwups sausen die Pakete durch die Leitung. 


- Alex


----------



## gest01 (12. Nov 2007)

Damit meinte ich ob du deine Packete evtl Serialisierst und einen ObjectOutputStream verwendest.
Meine Messages erben alle von derselben Basisklasse. Die Informationen (Strings, Ints etc) der Messages werden direkt in ein Byte-Array geschrieben und dann direkt an den OutputStream übergeben.


----------



## gest01 (12. Nov 2007)

Damit meinte ich ob du deine Packete evtl Serialisierst und einen ObjectOutputStream verwendest.
Meine Messages erben alle von derselben Basisklasse. Die Informationen (Strings, Ints etc) der Messages werden direkt in ein Byte-Array geschrieben und dann direkt an den OutputStream übergeben.


----------



## tuxedo (12. Nov 2007)

Bei VoIP gibts nix zu serialisieren. Das würde wohl auch zu lange dauern. Ich hab nackige byte[]'s die ich mit Client-Daten (ID, Zeitstempel etc) anreichere. Fertig.

Wie du das bei dir machst, musst du selbst rausfinden. Dazu gibts kein Rezept.

- Alex


----------



## gest01 (12. Nov 2007)

Genau so mache ich das auch, bzw. möchte ich das machen   
Kannst Du mir den Teil zeigen, wo du deine Packete empfängst?


----------



## gest01 (12. Nov 2007)

Hier mal meine Empfangs-Routine (läuft in einem Thread):


```
while(!interrupted())
			{
							
				// Read telegram header...
				byte[] telegramHeader = new byte[TelegramBase.HEADER_LENGTH];				
				this.in.read(telegramHeader);	
				
				// Get entire telegram length
				int telegramDataLength = ByteBuffer.wrap(telegramHeader, 2, 4).getInt();
				
				// Read telegram data (without header)
				byte[] telegramData = new byte[telegramDataLength - TelegramBase.HEADER_LENGTH];
				this.in.read(telegramData, 0, telegramData.length);	
				
				// Build byte array.. Header Length + dataLength
				byte[] telegram = new byte[telegramData.length + TelegramBase.HEADER_LENGTH];	
				
				// Copy Header and Data arrays into telegram
				System.arraycopy(telegramHeader, 0, telegram, 0, TelegramBase.HEADER_LENGTH);
				System.arraycopy(telegramData, 0, telegram, TelegramBase.HEADER_LENGTH, telegramData.length);
			
				onTelegramReceived(TelegramBase.create(telegram));	
				
			}
```


----------



## tuxedo (13. Nov 2007)

Du stellst dich an. Aber hier, bitte, glaub nicht dass dich das weiter bringt:


```
/**
	 * reads the next package from the inputstream
	 * @throws IOException
	 */
	public synchronized void readPacket() throws IOException{	
		mSenderId = dis.readInt();
		mTimestamp = dis.readLong();
		mLength = dis.readShort();
		data = new byte[mLength];
		dis.readFully(data, 0, data.length);
	}
```

Bei mir arbeiten gleich mehrere Threads am kompletten lesen der Daten. Einer wacht auf jeder Socketverbindung. Da bei VoIP in einer Konferenz die Sprachdaten verteilt werden müssen, signalisiert jeder Socket-Thread der Konferenzklasse: Hier, ich hab Daten für dich (wird mit available() geprüft), und die Konferenzklasse liest das Paket vom jeweiligen Socket und verteilt es an die anderen Konferenzteilnehmer. Den Code diesbezüglich kann ich nicht online stellen. Zum einen wäre es "etwas" zu viel, zum anderen hat das meine Firma nicht so gerne ... Du vestehst was ich meine ;-) ?

Du siehst: Es gibt sicher 1000. verschiedene Möglichkeiten das "drum rum" zu realisieren. Dabei kommts nunmal auch drauf an, ob man TCP oder UDP verwendet, ob man Zeitkritische Daten hat (z.B. audio streaming) und welche Latenz und Bandbreite einem zur verfügung stehen bzw. eingehalten werden müssen oder sollen.

- Alex


----------



## gest01 (13. Nov 2007)

Habs endlich hingekriegt   Danke für deinen Input!

```
package jdmslib.transportlayer;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import jdmslib.telegram.TelegramBase;
import jdmslib.threading.BaseThread;
import jdmslib.util.Logger;


public class TransportLayerThread extends BaseThread 
{
	private IncomingTelegramListener incomingCallback;
	private InputStream in;
	
	public TransportLayerThread(InputStream in, IncomingTelegramListener callback)
	{
		super("TransportLayerThread");

		this.in = in;
		this.incomingCallback = callback;
	}
	
	@Override
	public void run()
	{
		try
		{			
			while(!interrupted())
			{
				// Read telegram header...
				byte[] telegramHeader = new byte[TelegramBase.HEADER_LENGTH];				
				int readHeaderLength = this.in.read(telegramHeader, 0, TelegramBase.HEADER_LENGTH);
				
				// Check if all necessary header bytes were received...
				if (readHeaderLength < TelegramBase.HEADER_LENGTH)
					pending(TelegramBase.HEADER_LENGTH, readHeaderLength, telegramHeader);
				
				// Get entire telegram length
				int telegramDataLength = ByteBuffer.wrap(telegramHeader, 2, 4).getInt();
				
				// Get data length. Calculated from Header...
				int dataLengthExpected = telegramDataLength - TelegramBase.HEADER_LENGTH;
				
				// Read telegram data (without header)
				byte[] telegramData = new byte[dataLengthExpected];
				int dataLengthEffective = this.in.read(telegramData, 0, dataLengthExpected);	

				// Check if all necessery data bytes were received...
				if(dataLengthEffective < dataLengthExpected)
					pending(dataLengthExpected, dataLengthEffective, telegramData);
				
				// Build byte array.. Header Length + dataLength
				byte[] telegram = new byte[telegramData.length + TelegramBase.HEADER_LENGTH];	
				
				// Copy Header and Data arrays into telegram
				System.arraycopy(telegramHeader, 0, telegram, 0, TelegramBase.HEADER_LENGTH);
				System.arraycopy(telegramData, 0, telegram, TelegramBase.HEADER_LENGTH, telegramData.length);
			
				// Notify listener...
				onTelegramReceived(TelegramBase.create(telegram));	
				
			}
		}
		catch(Exception ex)
		{
			getUncaughtExceptionHandler().uncaughtException(this, ex);
		}
	}
	
	private void pending(int expectedSize, int effectiveRead, byte[] buffer) throws IOException
	{
		while(effectiveRead < expectedSize)
		{
			int stillNeeded = expectedSize - effectiveRead;
			//Logger.logNormal("pending", getName(), "Expected:%d, EffectiveRead:%d, StillNeeded:%d", expectedSize, effectiveRead, stillNeeded);

			byte[] tmp = new byte[stillNeeded];
			int rest = this.in.read(tmp, 0, stillNeeded);	
			System.arraycopy(tmp, 0, buffer, effectiveRead, rest);
			effectiveRead += rest;
		}
	}
	
	/**
	 * Notifies the application layer about incoming telegrams.
	 * @param incomingTelegram Received telegram from JDMS-Server.
	 */
	private void onTelegramReceived(TelegramBase incomingTelegram)
	{
		if(this.incomingCallback != null)
		{
			if (incomingTelegram != null)
			{
				this.incomingCallback.onIncomingTelegram(incomingTelegram);
			}
		}
	}
}
```


----------



## Gast (14. Nov 2007)

kgkgk


----------

