Consumer-Producer bauen

Fohnbit

Top Contributor
Hallo!

ich möchte einen Producer/Consumer bauen.
1. Die Hauptklasse empfängt ein bytes array. Diese soll sofort dem producer übergeben werden. Dieser bearbeitet diese noch etwas und legt es in die Queue.

2. Der Consumer wartet darauf das neue Elemente in die Queue gelegt werden.
Wenn ein neuer Wert, beginnt er den Wert abzuarbeiten. Das dauert sicher länger als neue Werte ankommen.

3. Nach Bearbeitung des einen Wert soll er das modifizierte Bytearray an die Main Klasse zurückgeben, welche diese weiter sendet und eine Antwort erwarten => 4

4. Es muss eine Antwort von Punkt 3. innerhalb von 100ms geben.
Wenn ja, Antwort an die Mainklasse zurückgeben.
Wenn nein, Timeout melden und nächsten Queue abarbeiten.

Wie muss ich das aber Umsetzen? Vor allem die Rückgabe an die Main Klasse?
Und es soll auf die Msg ja auch eine Antwort mit einem Timeout geben.
Java:
// Queue Message
public class Message {
	private byte[] msg;
    
    public Message(byte[] b){
        this.msg=b;
    }
 
    public byte[] getMsg() {
        return msg;
    }
 
}

Java:
public class Producer implements Runnable {
	private BlockingQueue<Message> queue;

	public Producer(BlockingQueue<Message> q) {
		this.queue = q;
	}

	public Message msg;

	public void setMessage() {
		try {
			queue.put(msg);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	@Override
	public void run() {
			
	}
	
}

Java:
public class Consumer implements Runnable {
	private BlockingQueue<Message> queue;

	public Consumer(BlockingQueue<Message> q) {
		this.queue = q;
	}

	@Override
	public void run() {
		try {
			while (true) {
				Thread.sleep(10);
				System.out.println("Consumed " + queue.take().getMsg());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

Main Klasse
Java:
	BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(10);
		Producer producer = new Producer(queue);

		for (int i = 0; i < 10; i++) {
			Message msg = new Message(new byte[] { 0x31, 0x32 });
			producer.msg = msg;
			producer.setMessage();
		}
		Consumer consumer = new Consumer(queue);
		new Thread(producer).start();
		new Thread(consumer).start();
 

Joose

Top Contributor
1. Die Hauptklasse empfängt ein bytes array. Diese soll sofort dem producer übergeben werden. Dieser bearbeitet diese noch etwas und legt es in die Queue.

2. Der Consumer wartet darauf das neue Elemente in die Queue gelegt werden.
Wenn ein neuer Wert, beginnt er den Wert abzuarbeiten. Das dauert sicher länger als neue Werte ankommen.

3. Nach Bearbeitung des einen Wert soll er das modifizierte Bytearray an die Main Klasse zurückgeben, welche diese weiter sendet und eine Antwort erwarten => 4

4. Es muss eine Antwort von Punkt 3. innerhalb von 100ms geben.
Wenn ja, Antwort an die Mainklasse zurückgeben.
Wenn nein, Timeout melden und nächsten Queue abarbeiten.

Wie muss ich das aber Umsetzen? Vor allem die Rückgabe an die Main Klasse?
Und es soll auf die Msg ja auch eine Antwort mit einem Timeout geben.

Gibt es (bei dir) einen Unterschied zwischen Main Klasse und Hauptklasse?
Um es klar zustellen:
Deine Main Klasse bekommt ein byte[] gibt diese an den Producer weiter, dieser bearbeitet diese Array etwas und gibt es an den Consumer weiter, dieser bearbeitet es wieder und gibt es dann an die Main Klasse weiter.
Diese schickt es wohin und wartet auf eine Antwort?

Normalerweise wird das Procuder/Consumer Konstrukt für parallel Abläufe eingesetzt um Zeit zu sparen. Da du meinst etwas wartet bevor die Queue weiter abgearbeitet wird sehe ich hier keinen wirklichen Gebrauch von Producer/Consumer ... einfach seriell deine Aufgaben durchführen und fertig.

Java:
public class Producer implements Runnable {
	private BlockingQueue<Message> queue;

	public Producer(BlockingQueue<Message> q) {
		this.queue = q;
	}

	public Message msg;

	public void setMessage() {
		try {
			queue.put(msg);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	@Override
	public void run() {
			
	}
	
}

Java:
for (int i = 0; i < 10; i++) {
			Message msg = new Message(new byte[] { 0x31, 0x32 });
			producer.msg = msg;
			producer.setMessage();
		}

Bitte überarbeite diesen Teil nochmal! Mache das Attribut "msg" von Producer private und übergib der Methode "setMessage" stattdessen einen Parameter!

Java:
	BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(10);
		Producer producer = new Producer(queue);

		for (int i = 0; i < 10; i++) {
			Message msg = new Message(new byte[] { 0x31, 0x32 });
			producer.msg = msg;
			producer.setMessage();
		}
		Consumer consumer = new Consumer(queue);
		new Thread(producer).start();
		new Thread(consumer).start();


Verwende für Producer bzw. Consumer unterschiedliche ArrayBlockingQueue Instanzen. Wenn beide die gleiche Instanz verwenden kann es passieren das der Consumer anfangt zu arbeiten bevor die Message vom Producer bearbeitet wurde.
 

Fohnbit

Top Contributor
Hallo!

Vielen Dank für deine Antwort:
Main Klasse und Hauptklasse sind ident, ja!

Ich benötige es um mehreren serielle Klassen Zugriff auf einen seriellen Port zu gewähren.
Da jede gesendeter Request eine Antwort bekommt, muss ich alle Requests in eine Queue "Zwischenspeichern" bis diese an der reihe sind.
Verschiedene queues? Ok, mache ich dann noch.

Soweit:

Messageklasse die die Bytes für den Comport definiert
Java:
public class Message {
	private byte[] msg;
    
    public Message(byte[] b){
        this.msg=b;
    }
 
    public byte[] getMsg() {
        return msg;
    }
 
}

//Interface für Datenrückgabe per Listener
Java:
public interface SendingListener {
	void onNewSendingValue(byte[] b);
}

//Producer übergibt Daten an die Queue
Java:
public class SendingProducer implements Runnable {
	private BlockingQueue<Message> queue;

	public SendingProducer(BlockingQueue<Message> q) {
		this.queue = q;
	}

	public void setMessage(Message msg) {
		try {
			queue.put(msg);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep(20);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}
}

// Der Consumer der die Daten zurückgibt zum tatsächlichen senden
Java:
public class SendingConsumer implements Runnable {
	
	SendingConsumer(SendingListener l){
		this.listener = l;
	}
	private BlockingQueue<Message> queue;
	private SendingListener listener;

	public SendingConsumer(BlockingQueue<Message> q) {
		this.queue = q;
	}

	public void listenerValueFire(byte[] b) {
		listener.onNewSendingValue(b);
	}

	@Override
	public void run() {
		try {
			while (true) {
				listenerValueFire(queue.take().getMsg());
				Thread.sleep(1000);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

In der Mainklasse
Java:
BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(10);
	SendingProducer sendingProducer = new SendingProducer(queue);
	SendingConsumer sendingConsumer = new SendingConsumer(queue);

new Thread(sendingProducer).start();
		new Thread(sendingConsumer).start();

// Daten an serielle senden
@Override
	public void onNewSendingValue(byte[] b) {
		updateOutputSerdata(b);
	}

// TEST Daten empfangen zum senden an die Queue
	public void onRequest(byte[] value) {
			Message msg = new Message(value);
			sendingProducer.setMessage(msg);
	

	}


Aber wie weiß der Consumer dann ob eine Antwort aus der MainKlasse empfangen wurde?
 

Joose

Top Contributor
Verschiedene queues? Ok, mache ich dann noch.

Ja wenn beide die gleiche Queue verwenden und du fügst das 1.Element zur Queue hinzu: Wie stellst du nun sicher das zuerst der Producer arbeitet und dann erst der Consumer?

Vielen Dank für deine Antwort:
Main Klasse und Hauptklasse sind ident, ja!

Ich benötige es um mehreren serielle Klassen Zugriff auf einen seriellen Port zu gewähren.
Da jede gesendeter Request eine Antwort bekommt, muss ich alle Requests in eine Queue "Zwischenspeichern" bis diese an der reihe sind.

Gut das du dein Problem nochmals genauer beschrieben hast: Producer/Consumer passt zu der beschrieben Problemstellung überhaupt nicht.
Mehrere Programme/Klasse können Messages an einen Port schicken: Dein Programm soll diese Messages der Reihe nach wie sie ankommen abarbeiten.

Sprich es würde hier eine Klasse vollkommen reichen:

Java:
public class PortWorker implements Runnable {

    private BlockingQueue<Message> messages;
    private boolean running;

    public PortWorker() {
        messages = new BlockingQueue<Message>();
    }

    public void run() {
        running = true;
        while(running) {
            Message next = messages.take(); // hier wird gewartet bis etwas in die Queue gelegt wird
            work1(next); // hier erledigst du deine manipulation vom Producer
            work2(next); // hier erledigst du deine manipulation vom Consumer
            sendBack(next); // in dieser methode wird die Message zurück geschickt und auf die antwort gewartet (max 100ms), dann wird die schleife weiter abgearbeitet        
        }
    }
}

Anmerkung: Verwende nie Endlosschleifen ala while(true), das ist eine schlechte Programmierung. Auch wenn dein Thread angeblich ewig laufen wird, sollte es immer die Möglichkeit geben diesen sauber zu beenden in dem man die Abbruchbedingung setzt!
 

Fohnbit

Top Contributor
Hallo Joose!

Ok, danke! Dann baue ich das um.
Aber wie lege ich in deinem Beispiel etwas in die Queue? Diese ist private und wird im Konstruktor neu generiert?
 

Joose

Top Contributor
Klar habe vergessen zu erwähnen das in meinem Beispiel Methoden ala "addNewMessage(Message m)" fehlen (sowie mögliche getter/setter).
Wie läuft den aktuell der Empfang von neuen Message ab?
 

Fohnbit

Top Contributor
Ich baue gerade die Klassen um. Aber blockiert der Tread sich nicht selbst, wenn er auch die Antwort wartet?
Er soll ja weiterhin zu sendete Daten in die Queue legen.

Um zu warten dachte ich an:
Java:
waitForAnswer = true;
				if (answerTimeout == null) {
					answerTimeout = executor.schedule(
							new Runnable() {
								public void run() {
									waitForAnswer = false;
								}
							}, 1000, TimeUnit.MILLISECONDS);

				}
				while (waitForAnswer) {
					// Wait for answer
				}

				if (answerTimeout != null) {
					answerTimeout.cancel(true);
					answerTimeout = null;
				}

Wenn eine Antwort einrtifft, wird "waitForAnswer" true, oder nach 100ms schaltet der Timer die Variable auf true.
Jedoch blockiert meine Methode hier, oder?
 
Zuletzt bearbeitet:

Fohnbit

Top Contributor
Ablauf:
Es kommen in unterschiedlichen Intervallen bytes[] zum senden an eine serielle Schnittstelle, die alle eine Antwort zurücksenden.

Nehmen wir mal 10 Sender an. Diese pollen in unterschiedlichen Intervallen den seriellen Port.
Da kann es eben passieren das sich 2 Abfragen zeitlich überschneiten.

Jeder Abfrage muss die passende Antwort zurückgesendet werden.
Die Antwort muss in einer passenden Zeitspanne eintreffen, sonst kann man diese verwerfen.
Nach einer Antwort muss man auch einige Millisekunden warten bis die nächste Abfrage auf den seriellen Bus gesendet werden kann.
 

Joose

Top Contributor
Achtung: Bei der 1.Erklärung bin ich davon ausgegangen das es sich um Programm handelt welches auf diesen Port horcht, die Empfangenen Arrays bearbeitet, zurückschickt und wieder auf eine Antwort oder ähnliches wartet.

Bei deiner neuen/aktuellen Erklärung handelt es sich doch eher um eine Art Client/Server Problem. Hier passt ein Producer/Consumer schon gar nicht und meine Möglichkeit oben auch nicht.
 
Ähnliche Java Themen
  Titel Forum Antworten Datum
F Producer/Consumer oder reicht synchronizedList Java Basics - Anfänger-Themen 9
A Thread Producer - Consumer Java Basics - Anfänger-Themen 1
I Threads Multithreading, Producer/Consumer, notify() Java Basics - Anfänger-Themen 6
B Threads Producer-Consumer Problem Java Basics - Anfänger-Themen 3
M Producer / Consumer mal anders. Java Basics - Anfänger-Themen 7
J Methoden Observer-Pattern mit Consumer und accept( ) Java Basics - Anfänger-Themen 6
DeVolt Interface Frage zu Consumer Java Basics - Anfänger-Themen 7
M mit Maven eine ausführbare Jar bauen Java Basics - Anfänger-Themen 7
B Hashmap richtig bauen, die Tripel auf Zahl abbildet? Java Basics - Anfänger-Themen 10
B 49-bit-zahl mit genau 6 Einsen bauen? Java Basics - Anfänger-Themen 21
R Ist es möglich, ein Zahlungs-Gateway mit Java zu bauen? Java Basics - Anfänger-Themen 11
Y Methoden Wie kann ich eine if-Abfrage bei Setters bauen? Java Basics - Anfänger-Themen 6
J Timer bauen, Main Methode immer wieder neu starten Java Basics - Anfänger-Themen 13
S Index File bauen Java Basics - Anfänger-Themen 5
O Erste Versuche eine Art "EventHandler" zu bauen.. Java Basics - Anfänger-Themen 8
S Frage Chat programm bauen Java Basics - Anfänger-Themen 5
E String aus Variablen bauen Java Basics - Anfänger-Themen 11
E Input/Output einfachen Socket für XML-Anfragen bauen Java Basics - Anfänger-Themen 13
B Theater bauen und ausgeben Java Basics - Anfänger-Themen 8
G bauen einer jar mit einem JFrame Java Basics - Anfänger-Themen 17
A "Refresher" bauen? Java Basics - Anfänger-Themen 2
A Java Chat Bauen !?!? Java Basics - Anfänger-Themen 10
U Einen Interpreter bauen? Java Basics - Anfänger-Themen 12

Ähnliche Java Themen

Neue Themen


Oben