Verständnisproblem bei AIO

Xeonkryptos

Bekanntes Mitglied
Moin liebe Community,

ich habe wohl ein Verständnisproblem mit der Funktionalität bzw. dem Aufruf von der read()-Methode vom [JAPI]AsynchronousSocketChannel[/JAPI], da es dort heißt
This method may be invoked at any time.

Darauf aufbauend hab ich eine eigene Klasse vom Typ Server und Client geschrieben, die zu Beginn nach dem Verbindungsaufbau 1x die Methode aufrufen, damit der CompletionHandler beim erfolgreichen oder fehlgeschlagenen Lesen speziell darauf reagieren kann. Nun liest der Client und der Server nur eine einzige Nachricht und danach gar keine mehr, obwohl ich sichergestellt habe, dass beide jeweils sich Nachrichten schicken ohne Fehlermeldung! ???:L

Some channel types may not allow more than one read to be outstanding at any given time.

Jetzt vermute ich mal, dass es entweder hiermit etwas zu tun hat oder ich irgendwo einen Verständnisfehler gemacht habe.

Hier mein Client:
Java:
public class AioClient implements Runnable {

	private Object attachment;
	private CompletionHandler<Void, Object> handler = new CompletionHandler<Void, Object>() {

		@Override
		public void completed(Void result, Object attachment) {
			write();
			server.read(readBuffer, AioClient.this.attachment,
					new CompletionHandler<Integer, Object>() {

						@Override
						public void completed(Integer result, Object attachment) {
							System.out.println(new String(readBuffer.array()));
						}

						@Override
						public void failed(Throwable exc, Object attachment) {
							exc.printStackTrace();
						}
					});
		}

		@Override
		public void failed(Throwable exc, Object attachment) {
			exc.printStackTrace();
		}

	};

	private AsynchronousSocketChannel server;
	private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
	private List<AsynchronousSocketChannel> clientList = new LinkedList<AsynchronousSocketChannel>();

	public AioClient() throws IOException {
		server = AsynchronousSocketChannel.open();
		server.connect(new InetSocketAddress("localhost", 3141), null, handler);
	}

	@Override
	public void run() {
                // Überprüfung, ob der Server Nachrichten empfängt
		Scanner scan = new Scanner(System.in);
		String msg;
		while (true) {
			System.out.println("Schreib eine Nachricht");
			msg = scan.nextLine();
			write(msg, null);
		}
	}

	private void write(String msg) {
		server.write(ByteBuffer.wrap(msg.getBytes()), null,
				new CompletionHandler<Integer, Object>() {

					@Override
					public void completed(Integer result, Object attachment) {
						System.out.println("Nachricht gesendet.");
					}

					@Override
					public void failed(Throwable exc, Object attachment) {
					}
				});
	}
}

und der Server:
Java:
public class AioServer implements Runnable {

	private AsynchronousServerSocketChannel client;
	private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
	private List<User> clientList = Collections
			.synchronizedList(new ArrayList<User>());
	private Map<User, AsynchronousSocketChannel> clientMap = Collections
			.synchronizedMap(new HashMap<User, AsynchronousSocketChannel>(50));
	private CompletionHandler<Integer, Object> getMessageHandler = new CompletionHandler<Integer, Object>() {

		@Override
		public void completed(Integer result, Object attachment) {
			try {
				System.out.println("Nachricht empfangen.");
				read();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}

		@Override
		public void failed(Throwable exc, Object attachment) {
			System.out.println("Konnte Nachricht nicht lesen.");
			exc.printStackTrace();
		}
	};
	private CompletionHandler<AsynchronousSocketChannel, Object> acceptConnHandler = new CompletionHandler<AsynchronousSocketChannel, Object>() {

		@Override
		public void completed(AsynchronousSocketChannel result,
				Object attachment) {
			client.accept(null, this);
			try {
				accept(result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}

		@Override
		public void failed(Throwable exc, Object attachment) {
		}

	};

	public AioServer() throws IOException {
		client = AsynchronousServerSocketChannel.open().bind(
				new InetSocketAddress(3141));
		client.accept(null, acceptConnHandler);
	}

	@Override
	public void run() {
		while (true) {
                        // Eigenmächtiges Senden des Servers nach Pausen zur Überprüfung des Empfangs vom Client
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				writeToAll("Xeonkryptos",
						"Nachricht vom Server nach Sleep".getBytes());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
	}

	private void read() throws InterruptedException, ExecutionException {
		for (User user : clientList) {
			readBuffer.clear();
			clientMap.get(user).read(readBuffer).get(); // Hier nach Kürzung des Codes wieder unsicher, ob das so richtig ist...
			writeToAll(user.getName(), readBuffer.array());
		}
		System.out.println("Nachricht gepostet.");
	}

	private void accept(AsynchronousSocketChannel result, User user)
			throws InterruptedException, ExecutionException {
		clientList.add(user);
		clientMap.put(user, result);
		writeToAll("", ("Xeonkryptos joined the Chat.").getBytes());
		result.read(readBuffer, null, getMessageHandler);
	}

	public void writeToAll(String nick, byte[] data)
			throws InterruptedException, ExecutionException {
		if (data.length > -1) {
			byte[] msgBytes = new byte[nick.getBytes().length + data.length];
			byte[] nickByte = nick.getBytes();
			for (int i = 0; i < nickByte.length; i++) {
				msgBytes[i] = nickByte[i];
			}
			for (int i = nickByte.length; i < msgBytes.length; i++) {
				msgBytes[i] = data[i - nickByte.length];
			}
			for (User user : clientList) {
				clientMap.get(user).write(ByteBuffer.wrap(msgBytes), null,
						new CompletionHandler<Integer, Object>() {

							@Override
							public void completed(Integer result,
									Object attachment) {
								System.out.println(result + " Bytes gesendet");
							}

							@Override
							public void failed(Throwable exc, Object attachment) {
								System.out.println("Fehlgeschlagen");
							}
						});
			}
		}
	}
}
 
Zuletzt bearbeitet:

Kr0e

Gesperrter Benutzer
Also grundsätzlich zum Konzept von AIO (vieles davon hast du ja bereits):


AIO bedeutet, dass du dem OS eine Aufgabe gibst (Lesen/SChreiben) und ein Callback z.b. angibst, wo du wissen willst, wie es ausgegangen ist.

Read() kann immer ausgeführt werden, aber wenn bereits eine Read() Operation dran ist, wird eine *PendingException geworfen.

Bedenke, dass jeder Aufruf von Read() genau ein Ergebnis erzeugt. Sprich du liest in der Tat nur einmal, wenn ich das richtig gesehen hab.


Ansich läuft es so: Du rufst Read() auf und übergibts den Handler (Hier stubst du quasi die Kette an). Im Handler selber würde man dann normalerweise erneut Read() aufrufen mit "this". Damit du einer Art Rekursion hast. (Es ist keine wirkliche Rekursion, also keine Sorge wegen dem Stack ;))



Ich denke, am einfachsten wäre es, wenn ich es dir kurz zeige:


Java:
final AsynchronousSocketChannel client = ....; // Init + Co

CompletionHandler ch = new CompletionHandler<Integer, AsynchronousSocketChannel >() {
 
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                         // PROCESS BYTEBUFFER (Also Protokoll)
                        
                         // War alles ok ? Soll nochmal gelesen werden ? Wenn ja :
                         client.read(attachment, attachment, this); // Ich mach das grad ausm Kopf, ich hoffe die Deklaration war richtig

             
                    }
 
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                          // Fehler.. Am besten den channel schließen...
                    }
                };

// EINMALIG das ganze ins Rollen bringen....
client.read(attachment, attachment, this);


Außerdem: Bei AIO ist es durchaus konform sich für jeden CompletionHandler eine Klasse zu erstellen. Zumindest mit Infos des Channels etc.

Also in diesem Fall wäre eine Klasse "Reader" angebracht die als Attribut z.b. den Channel hat und dann iwas tolles mit den empfangenen Daten anfängt.


Vermutlich fragst du dich, was so toll an AIO ist :D Glaube mir , wenn du einmal das Konzept drin hast, willst du nie mehr was anderes ;)
 

Xeonkryptos

Bekanntes Mitglied
Danke, das hat mir jetzt weitergeholfen. :) Es funktioniert zuerst mal so, wie es soll. =)

Jetzt hab ich aber noch eine Frage: Immer wenn man einen CompletionHandler verwendet, ob beim Schreiben oder Lesen, hat man Attachment dabei und ich dachte mir halt, dass dieses ein Zusatz zur eigentlichen Nachricht ist.

Lieg ich hiermit richtig? Hab es mir so von der Übersetzung hergeleitet.

Wenn dem so ist, bekomme ich es nicht hin, ein beliebiges Attachment mitzuschicken (in aller Regel Object, da ich unterschiedliche Objekte mitschicken würde), worauf dann die weitere Verarbeitung aufbauen würde. Es kommt hingegen immer ein null-Wert raus.

Lieg ich von meiner Annahme zum Attachment völlig falsch oder gibt es da einen Trick? Hab es immer in die Methode zum Schreiben eingefügt mit dem CompletionHandler, aber ankommen tut nur die Nachricht, nicht das Anhängsel...
 

Kr0e

Gesperrter Benutzer
Das Attachment ist etwas total primitives. Stells dir einfach als Variable vor, die du weiterreichen kannst bei einem read Aufruf.

Bei NIO gabs sowas beim SelectionKey glaub ich auch....


Wenn du channel.read(bytebuffer, attachment, handler<int, attachment type>) aufrufst, dann kannste bei attachment ein Objekt deiner Wahl mitliefern. Der CompletionHandler muss dann eine entsprechende generische Deklaration haben. Ein CompletionHandler mit Object als Attachment kann natürlich jedes Attachment entgegen nehmen. Wenn du aber einen bestimmten Typen hast, kannst ud hinterher das casten sparen. Mehr Sinn hat das nicht...

Ich machs ganz gern so:

Java:
ByteBuffer buffer = ...;

channel.read(buffer, buffer, CompletionHandler<Integer, ByteBuffer> ... );

dann hasse im CompletionHandler direkt den Buffer und kannst damit ne Menge machen. Wobei das eigentlcih nicht die ganze Wahrheit ist... Um nochmal zu Lesen müsstest du ja innerhalb des Handlers Zugriff auf den Channel haben...


Also könnte man durchaus auch sowas machen:

Java:
public class Connection {

     public ByteBuffer readBuffer;
     public AsynchronousByteChannel channel;
}


Connection con = ...;

con.channel.read(con.readBuffer, con, CompletionHandler<Integer, BYteBuffer>... );

Ist jetzt nur grob, hab zur Zeit grad noch ein bissel Arbeit nebenher zu erledigen...


Auf diese Weise kannst ud sogar sehr generischen Code schreiben. Z.b. wäre dir dann beim Lesen egal obs ein AsyncFileChannel oder AsyncSocketChannel is...

Gruß,

Chris
 

Xeonkryptos

Bekanntes Mitglied
Okay, dann hab ich den Sinn hinter dem Attachment richtig verstanden, aber irgendwie will es bei mir nicht funktionieren. Zum Beispiel möchte ich bei der Verbindungsherstellung zwischen dem Client und Server ein Attachment (Meine Identität) weiterreichen und gebe deshalb in der connect-Methode meines Clients das Attachment mit, doch beim Server kommt da einfach eine null-Referenz an...

Serverseite:
Java:
client.accept(attachment,
				new CompletionHandler<AsynchronousSocketChannel, Object>() {

					@Override
					public void completed(AsynchronousSocketChannel result,
							Object attachment) {
						// ... attachment auslesen, etc.
					}

					@Override
					public void failed(Throwable exc, Object attachment) {
					}

				});
Client-Seite: (user = das zu übergebene Attachment)
Java:
server.connect(new InetSocketAddress("localhost", 3141), user,
				new CompletionHandler<Void, Object>() {

					@Override
					public void completed(Void result, Object attachment) {
						// ...
					}

					@Override
					public void failed(Throwable exc, Object attachment) {
					}

				});

Weiß nicht, was jetzt hier das Problem ist... Theoretisch sollte es funktionieren, da Verbindungsaufbau und alles andere keine Probleme macht und das Objekt auch nicht null ist, da ich dieses initialisiert habe...

Edit: Das initialisieren des Attachments beim Server selbst hab ich mir erspart, da dieses sowieso überschrieben wird.
 
Zuletzt bearbeitet:

Kr0e

Gesperrter Benutzer
Ich verstehe nicht so Recht dein Problem, denkst du etwa, das Attachment wird zum Server übertragen ?? Oder wie genau meinst du das ? Das Attachment wird NICHT weitergereicht zum Server. Ich sagte ja bereits, dass das nur eine sehr primitive Funktion ist. Wenn du innerhalb einer JVM eine asynchrone Methode aufrufst, so wird das Callback natürlich auf der selben Seite ausgeführt. Ein Attachment verlässt niemals die JVM!

Außerdem ist deine Namensgebung iwie verwirrend. Warum heißt der Channel auf der Clientseite server. ? Und umgekehrt auf der Serverseite client. ?

Der ServerChanenl akzeptiert asynchron neue Verbindungen. Beim AcceptHandler kommt dann das Attachment an, was du beim ACCEPT-Aufruf mitgegeben hast! Das Attachment ist nur eine Art Verwaltungsutility.. Mehr nicht.^^
 

Xeonkryptos

Bekanntes Mitglied
Dann hatte ich eine falsche Annahme diesbezüglich gehabt. :) Okay, dann haben sich meine Fragen bis jetzt erledigt. Danke dir für die Hilfe! :)
 

Ähnliche Java Themen


Oben