# Mehrere Streams durch einen Stream senden



## VdA (5. Mrz 2008)

Hi!
Mein Problem:
Ich habe einen Server und einen Client.
Zwischen ihnen sollen verschiedene Daten versendet werden und zwar:
1. gleichzeitig
2. an verschiedene Threads

erster Lösungsansatz:
Für jeden Thread eine neue Verbindung aufbauen -> schwachsinn bei der Kommunikation übers Internet, denn was ist wenn sich zwei clients, die hinter dem selben Router hängen, gleichzeitig sich mit  dem Server verbinden? - Totales Stromwirrwarrr

zweiter Lösungsansatz:
Alles durch eine Verbindung(Object(In/Out)putStream) mit Datenpacket-Objekten senden.
Die Datenpakete enthalten einen Namen, an dem der andere Feststellen kann wofür die Daten sind, und eine byte Array mit den Daten.

Das ganze wird dann mit einer Klasse "TrichterOutput" zusammen versendet:

```
public class TrichterOutputStream extends OutputStream 
{
	final ObjectOutputStream oout;
	public TrichterOutputStream(ObjectOutputStream out) 
	{
		this.oout=out;
	}

	@Override
	@Deprecated
	/**
	 * Does nothing!
	 * Please use getOutputStream(String name) instead.
	 * */
	public void write(int arg0) throws IOException 
	{

	}
	public OutputStream getOutputStream(final String name) throws IOException
	{
		//creating Pipes and connect them 
		PipedOutputStream out=new PipedOutputStream();
		final PipedInputStream in=new PipedInputStream(out);
		
		Thread lesen=new Thread(new Runnable()
		{
			public void run()
			{
				while(true)
				{
					try {
						byte[] buffer=new byte[1024];
						in.read(buffer);
						oout.writeObject(new DataPacket(name, buffer));
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		lesen.setName("TrichterOutputStream$lesen");
		lesen.start();
		return out;
	}
}
```


Und auf der anderen Seite wieder getrennt:

```
public class TrichterInputStream extends InputStream 
{
	final ObjectInputStream in;
	HashMap<String, PipedOutputStream> outputstreams=new HashMap<String, PipedOutputStream>();
	HashMap<String, PipedInputStream> inputstreams=new HashMap<String, PipedInputStream>();
	public TrichterInputStream(ObjectInputStream in1)
	{
		this.in=in1;
		Thread lesen=new Thread(new Runnable()
		{
			public void run()
			{
				while(true)
				{
					try {
						DataPacket datapacket = (DataPacket) in.readObject();
						if(outputstreams.get(datapacket.getName()) != null)
						{
							outputstreams.get(datapacket.getName()).write(datapacket.getData());
						}
						else
						{
							//creating Pipes and connect them 
							PipedOutputStream out=new PipedOutputStream();
							PipedInputStream in=new PipedInputStream(out);
							
							//adding them to the list
							outputstreams.put(datapacket.getName(), out);
							inputstreams.put(datapacket.getName(), in);
						}
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (ClassNotFoundException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		lesen.setName("TrichterInputStream$lesen");
		lesen.start();
	}
	
	@Override
	@Deprecated
	/**
	 * Will always return -1.
	 * Please use getInputStream(String name).
	 * */
	public int read() throws IOException 
	{	
		return -1;
	}
	
	public InputStream getInputStream(String name) throws IOException
	{
		if(outputstreams.get(name) == null)
		{
			//creating Pipes and connect them 
			PipedOutputStream out=new PipedOutputStream();
			PipedInputStream in=new PipedInputStream(out);
			
			//adding them to the list
			outputstreams.put(name, out);
			inputstreams.put(name, in);
		}
		return inputstreams.get(name);
	}
}
```

Wie ihr sehen könnt ist das sozusagen ein Stream den man mit mehreren Streams gleichzeitig verketten kann.

Mein Problem besteht nun, dass der Thread auf der lesenden Seite an folgender Stelle nicht weiter läuft:

```
TrichterInputStream tin;
//....
//....
//....
ObjectInputStream oin=new ObjectInputStream(tin.getInputStream(type));
```
Ein Blick in den Stack sagt mir das der ObjectInputStream auf den Header des Streams wartet und der einfach nicht ankommt.

Was kann ich machen damit der Thread da weiterläuft?
Ich mein  tin.getInputStream(type) liefert ja ein PipedInputStream da is doch schon ein Header drin oder?

THX for help


----------



## tuxedo (6. Mrz 2008)

Moin Moin,

wieso macht du's so kompliziert? 

Was hälst du von einer "Sende-Queue"?

Deine einzelnen Programmthreads welche senden wollen rufen in einem zentralen SendeThread eine Methode auf und übergeben das zu sendende Objekt. In diesem Sendethread landet erstmal alles in einer Queue. Un über eine while() Schleife in der run() Methode sendest du alles was im Puffer/Queue vorhanden ist.

Das geht natürlich nur, solange du auf Streams in deinen einzelnen Programmthreads verzichten kannst. 

Ach ja: Der "Rückwärtsweg" fehlt ja noch. Okay. Da hab ich auch was:

Du brauchst noch einen Empfangsthread. Deine einzelnen Programmthreads welche auch Daten empfangen können sollen, registrieren sich beim Empfangsthread mit Ihrer ID oder ihrem Namen. Der Empfangsthread holt zyklisch Objekte aus dem ObjectInputstream und liest aus den Objekten den Empfänger aus. Dann wird geschaut ob sich dafür ein Programmthread registriert hat. Wenn ja, wird das Objekt an den entsprechenden Programmthread übergeben.

Das hat sogar den Vorteil dass du das Objekt an mehrere, und nicht nur einen Programmthread schicken kannst.

- Alex

P.S. Was für Daten schickst du denn? Wenns nicht gerade Binärdaten sind (Files): Wäre RMI oder SIMON (siehe links in meiner Signatur) nicht ne "einfachere" Lösung??


----------



## VdA (6. Mrz 2008)

Mein Problem, und auch der Grund warum ich es so kompliziert mache, ist dass ich über diese Leitung Binärdaten (Bilder, Dateien) und Textnachrichten (mit Reader und Writer) versenden will.
Dabei kann es vorkommen dass zwei Threads Bilder empfangen wollen aber halt nicht das gleiche sondern unterschiedlicheBilder.
Von daher kann ich nich einfach nach Datentyp sortieren.
Die Idee mit der EventQueue halt ich für ganz sinnvoll ich werd mir mal überlegen ob ich die weiter verfolge wenn ich das so wie es jetzt ist nicht verwirklicht kriege.
Ich fand einfach die Idee praktisch, dass sich im Prinzip nichts an dem eigentlichen Programm ändert, wenn ich anstatt von mehreren Verbindungen einfach einen TrichterStream zwischenschalte.
Mal sehn aber trotzdem wär ich dankbar wenn ihr mir nochmal ein Tip gebt wie ich das obere zum laufen kriege da es für mich weniger Arbeit ist da noch ein paar Zeilen code einzufügen, damit es läuft als das ganze jetzt auf eine EventQueue mit listeners umzubauen.


----------



## tuxedo (6. Mrz 2008)

Okay, RMI und SIMON fallen nun raus.

Dein obiger Ansatz:

Naja. Ich habs nicht ausprobiert. Aber ich tippe da mal auf noch mehr "Fehlerquellen". Was mit zum Beispiel auffällt: 

Das Schreiben in den OOS beim TOS ist nicht "synchronisiert". Weiß nicht ob der OOS da selbst synchronisiert. Glaub's aber fast nicht. 

Du musst das TIS und TOS Prinzip ja nicht gleich ganz verwerfen. Du kannst mittels den Pipes auch das Queue Prinzip damit umsetzen:

Du lässt das nach wie vor so, dass man vom TOS und TIS sich eien Stream holen kann. Nur schreibt der Stream nicht gleich auf den OOS, bzw ließt davon, sondern die Daten wandern erstmal in eine Queue wo "EIN" zentraler Thread empfängt, bzw. ließt. Die Daten werden beim TIS im Empfangsthread einfach wieder gelesen, ausgewertet und dann in die entsprechende Pipe geschickt. 

Die einzigste "Schwierigkeit" liegt darin, beim TOS die einzelnen Pipe-Streams zu "überwachen" und die Daten in die Queue zu schieben. 

- Alex


----------



## VdA (6. Mrz 2008)

danke das könnts sein.
Das hab ich ganz übersehen, dass ich ja aus mehreren Threads in den Stream schreibe :autsch: 
ich guck mir das jetzt mal an.


----------



## tuxedo (6. Mrz 2008)

Hab mir gerade mal die "Mühe" gemacht deinen Code zu testen.

Also bei mir geht's prima via Localhost.

Serveranwendung sieht so aus:


```
erverSocket s = new ServerSocket(3001);
		Socket accept = s.accept();
		
		TrichterInputStream tis = new TrichterInputStream(new ObjectInputStream(accept.getInputStream()));
		
		InputStream inputStream = tis.getInputStream("test");
		
		while (true){
			byte[] b = new byte[3];
			inputStream.read(b);
			System.out.println(new String(b));
		}
```


Client so:


```
Socket s = new Socket("localhost",3001);
		
		TrichterOutputStream tos = new TrichterOutputStream(new ObjectOutputStream(s.getOutputStream()));
		
		OutputStream outputStream = tos.getOutputStream("test");
		
		while(true){
			byte[] b = new String("abc").getBytes();
			outputStream.write(b);
		}
```

Die Data-Klasse hab ich ganz "primitiv" nachgebaut.


----------



## SebiB90 (6. Mrz 2008)

@alex
das problem war, das wenn man getInputStream() einem ObjectInputStream übergibt, es nicht funktioniert und nicht wenn der Trichter ein ObjektInputStream ist.


----------



## VdA (6. Mrz 2008)

ich hab mir das mal angesehen und festegestellt, dass das Problem nur auftritt wenn ich ans Ende einen ObjectIn/OutputStream ranhänge.  
Bei normalen (Buffered)In/OutputStreams funktioniert alles.
Warum nur ???:L


----------



## tuxedo (6. Mrz 2008)

Das liegt wohlmöglich an den Stream-Typen.

Wenn du "außen" einen Object*Stream verwendest, wird davon ausgegangen, dass an der gegenüberliegenden Seite des Streams auch ein solcher Object*Stream hängt.

In deiner Konstellation geht das aber nicht, da das ganze durch Pipes und das Transport-Objekt unterbrochen ist.

Und warum geht das nicht?

Ganz einfach: Object*Streams kommunizieren miteinander und tauschen direkt Daten aus. Du _musst_ auf beiden Seiten einen korrespondierenden Object*Stream haben. Sonst geht's nicht.

Noch was:
Object*Streams machen ein "übles" caching. Wenn du nicht ab und zu mal reset() am Stream aufrufst läuft die irgendwann der Speicher inder JVM voll. 

Würde das ganz ohne Object*Streams machen. Du kannst dir dann auch das mit dem Transportobjekt sparen. Du definierst einfach, dass die ersten x Bytes die durch den Trichter im inneren gehen, die ID des Streams ist. Kannst ja 4 Bytes nehmen und damit ein Integer auf Byte-Ebene abbilden. 

Dann hast du das "langsame" serialisieren weg. Und wenn du Files schickst, solltest du diese Ebenfalls nicht mit Object*Streams serialisieren. Denn da wird viel Reflection-Aufwand betrieben um die Objekte in Bytes zu wandeln und umgekehrt.

Schicke deine Bilder lieber mit "nackigen" Input/OutputStreams. Das geht schneller und erspart dir dann auch den lästigen Cache der Object*Streams.

- Alex


----------



## VdA (6. Mrz 2008)

also wenn ich mir das ganze Prog so angucke könnte man das auch ohne ObjectStreams machen...... :### 
nur wär das Schade, wenn man über meine TrichterStreams keine Objecte versenden könnte, da ich die bestimmt noch öfter in anderen Programmen gebrauchen könnte.

Aber der Tip mit den Bildern ist gut.Ich hab mich schon gefragt, warum das immer so lange dauert bis so ein Bild rüber ist


----------



## tuxedo (6. Mrz 2008)

Du kannst dennoch Objekte schicken. Allerdings "solltest" du den Umweg dann über einen ByteBuffer gehen. Du kannst da Objekte mit  dem OOS reinschreiben und dann als byte[] wieder rausholen, welche du dann über deinen Trichter senden und auf der anderen Seite wieder genau so zurückbauen kannst. 

"Seltsamerweise" geht das ohne direkte Verbindung zwischen OOS und OIS ...

Aber wie gesagt: mit OOS serialisieren ist "aufwendig" und wird mit steigender Objekttiefe (viel Vererbung, viele Interfaces, ...) immer schlimmer.

- Alex


----------



## VdA (6. Mrz 2008)

mach ich das nicht schon so in TrichterInputStream:

```
byte[] buffer=new byte[1024];
in.read(buffer);
pakete.add(new DataPacket(name, buffer));
```

und TrichterOutputStream?:

```
outputstreams.get(datapacket.getName()).write(datapacket.getData());
```
ich mein der Stream wird in ein byte[] geschrieben und das wird später wieder in einen Stream geschrieben ???:L

Sollte dann der header nicht auch mit reinkopiert werden?


----------



## VdA (7. Mrz 2008)

keiner ne antwort :cry: ?


----------



## tuxedo (9. Mrz 2008)

Ich weiß es nicht. Ich weiß nur dass das serialisieren in einen ByteBufferStream funktioniert. Vermutlich wird da irgendwas erhalten (stream-informationen?) was bei normalen byte[]'s wegfällt.

Probier doch mal die Sache mit dem ByteBuffer ...


- Alex


----------



## VdA (15. Mrz 2008)

ich weiß jetzt woran es lag.

```
DataPacket datapacket = (DataPacket) in.readObject();
						if(outputstreams.get(datapacket.getName()) != null)
						{
							if(datapacket.getData() instanceof byte[])
								outputstreams.get(datapacket.getName()).write((byte[])datapacket.getData());
							else
								objectlisteners.get(datapacket.getName()).objectRecieved(datapacket.getData());
						}
						else
						{
							//creating Pipes and connect them 
							PipedOutputStream out=new PipedOutputStream();
							PipedInputStream in=new PipedInputStream(out);
							
							//adding them to the list
							outputstreams.put(datapacket.getName(), out);
							inputstreams.put(datapacket.getName(), in);
							
//Die zeile hier hatte ich vergessen in TrichterInputStream
							outputstreams.get(datapacket.getName()).write((byte[])datapacket.getData());
						}
```

*in die Ecke leg und schäm* 

EDIT: *wieder aus der Ecke rauskomm, weils immer noch nicht geht*


----------

