Du verwendest einen veralteten Browser. Es ist möglich, dass diese oder andere Websites nicht korrekt angezeigt werden. Du solltest ein Upgrade durchführen oder ein alternativer Browser verwenden.
Nun ist mein Problem aber, dass ich ebenfalls EJB verwende.
Das heißt:
Ich möchte auch innerhalb der oben genannten Klasse (der Link) auf eines meiner anderen EJB Klassen machen, was aber so dann nicht geht....
Wie kann man das genau machen?
Wenn ich das richtig sehe habe ich:
MyEmailIdleService
a) Ruft alle Emailadresse ab, bei denen eine Prüfung erfolgen soll
b) Macht dann einen Thread per Emailadresse auf und überwacht diesen
Denke mal nicht - ich hatte die Anforderung bisher nicht, in einer Java EE-Anwendung Postfächer überwachen zu müssen. Da JavaMail sowieso zum Einsatz kommt, frage ich mich halt, warum man nicht verwendet, was die Lib bietet. Sieht auf den ersten Blick jedenfalls recht einfach aus.
In der Doku steht: "The IdleManager is created with a Session, which it uses only to control debug output. A single IdleManager instance can watch multiple Folders from multiple Stores and multiple Sessions."
Aber ich möchte die Möglichkeit haben einzelne Emailadresse auch nicht mehr zu überwachen...
Das brauche ich doch pro Emailadresse einen IdleManager?
Damit ich die stop() - Methode anwenden kann?
Wie kann ich das machen?
Ich könnte ja zB alle IdleManager Objekte in einer Liste speichern. Leider hat IdleManager keine ID, sonst könnte ich sowas machen wie
public void stop(String id){
// Suche ID aus der Liste heraus und wenn auf dieses Objekt dann stop() an...
Du brauchst die stop()-Methode dafür nicht. Die Überwachung durch den IdleManager funktioniert andersrum. Auch das steht in der Doku (den wichtigen Part habe ich mal hervorgehoben): "Note that, after processing new messages in your listener, or doing any other operations on the folder in any other thread, you need to tell the IdleManager to watch for more new messages. Unless, of course, you close the folder."
Heißt: sobald Du irgendwas mit einem überwachten Folder machst, musst Du dem IdleManager erneut sagen, dass er den Folder überwachen soll.
ja, aber wenn ich dann von "außen" eingreifen möchte und die Überwachung eines einzelnen Postfachs stoppen möchte, dann brauche ich doch eine Methode wie stop() ?
Also so ganz komme ich damit leider noch nicht klar.... Ich habe jetzt doch mal den Beispielcode aus meinem ersten Post genommen, da das Grundprinzip hier schon passt - und auch Dinge wie alle 9 Minuten der Server connected wird um einen Timeout vermeiden etc.
Das Überwachen der Email funktioniert, aber mein Problem ist:
a) Aufruf einer anderen EJB dann
Vielleicht mal vom Grundprinzip, was ich machen will:
Ich will mehrere Emailadressen überwachen, ob neue Email vorhanden sind.
Wenn ja (und hier kommt dann der Aufruf einer anderen EJB zum Einsatz), möchte ich in meinem Backend prüfen, ob es zu dem Betreff und dem Sender in meiner Datenbank bereits einen Eintrag gibt.
Wenn ja, dann werden weitere Dinge geschehen (Speicherung in DB etc. - soll jetzt aber nicht das Thema sein)...
Im Moment sieht es nun so aus:
1) TicketEmailWatcherService...
-> Ich denke, diese muss ich später noch mit @Startup deklarieren, dass diese beim Starten der Applikation gestartet wird und die startListener() - Methode mit @PostConstruct ?
Hier hole ich mir die Emailadressen aus der Datenbank, die überprüft werden sollen.
Ebenfalls erstelle ich das Store - Objekt, um mich später zu connecten....
Grundproblem:
Am liebsten wäre mir in der "TicketEmailWatcherService", dass ich dann eine Liste der Emails zurückbekomme, die neu sind.
Und dann kann ich mit den "Message" - Objekten machen, was ich will. Nachricht in der DB speichern usw.
Aber wie kann ich das machen??
Java:
public class TicketEmailWatcherService {
private final Logger LOGGER = LoggerFactory.getLogger(TicketEmailWatcherService.class);
@EJB
private TicketService ticketService;
@EJB
private EmployeeService employeeService;
@EJB
private EmailSettingPasswordService emailSettingPasswordService;
@EJB
private TicketEmailSettingService ticketEmailSettingService;
public void startListener()
throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, IllegalBlockSizeException,
BadPaddingException, EmailSettingNotFoundException, IOException, MessagingException {
LOGGER.info("START startListener");
List<TicketEmailSetting> list = ticketEmailSettingService.findAllTicketEmailSettingListByStatusActive();
if (list == null || list.isEmpty())
return;
for (TicketEmailSetting ticketEmailSetting : list) {
Store store = connect(ticketEmailSetting.getEmailSetting());
MailPushEventHandler mailEventHandler = new MailPushEventHandler(store);
}
LOGGER.info("END startListener");
}
Java:
import java.io.IOException;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Store;
public class MailPushEventHandler extends MailPushEmpfaenger {
public MailPushEventHandler(Store store) {
super(store);
}
@Override
public void onMailReceived(Object[] messages) {
if (messages != null && messages.length > 0) {
for (Object msg : messages) {
try {
String textFromMessage = getTextFromMessage((Message) msg);
} catch (MessagingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
[CODE=java]public class MailPushEmpfaenger {
private final Logger LOGGER = LoggerFactory.getLogger(MailPushEmpfaenger.class);
private String randomInstanceID;
private boolean stopped;
private IMAPFolder inbox;
private Thread listeningThread;
private Thread reconnectionThread;
/**
* Constructor
*
* @param host
* @param username
* @param password
*/
public MailPushEmpfaenger(Store store) {
randomInstanceID = String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
listeningThread = new Thread(new Runnable() {
@Override
public void run() {
while (!stopped) {
try {
LOGGER.info("[MailRX " + randomInstanceID + "] Connecting and logging in...");
// // Connect with IMAP Server
// Properties props = new Properties();
// props.setProperty("mail.store.protocol", "imaps");
// Session session = Session.getDefaultInstance(props, null);
// Store store = session.getStore("imaps");
// store.connect(host, username, password);
LOGGER.info("[MailRX " + randomInstanceID + "] Connected.");
// Open inbox
inbox = (IMAPFolder) store.getFolder("INBOX");
inbox.open(IMAPFolder.READ_WRITE);
// initial Mail Check
checkNewMessages();
while (!stopped) {
LOGGER.info("[MailRX " + randomInstanceID + "] IDLE-ing");
inbox.idle(true);
checkNewMessages();
}
} catch (MessagingException e) {
e.printStackTrace();
}
}
}
});
reconnectionThread = new Thread(new Runnable() {
@Override
public void run() {
while (!stopped) {
try {
// Sleep 9 minutes
Thread.sleep(9 * 60 * 1000);
LOGGER.info("[MailRX " + randomInstanceID + "] NOOP");
inbox.doCommand(new IMAPFolder.ProtocolCommand() {
@Override
public Object doCommand(IMAPProtocol protocol) throws ProtocolException {
protocol.simpleCommand("NOOP", null);
return null;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
listeningThread.start();
reconnectionThread.start();
}
/**
* Check for new Emails
*
* @throws MessagingException
*/
private Object[] checkNewMessages() throws MessagingException {
LOGGER.info("[MailRX " + randomInstanceID + "] Checking for new messages...");
if (!inbox.isOpen()) {
inbox.open(IMAPFolder.READ_WRITE);
}
if (inbox.getMessageCount() > 0) {
ArrayList<Message> msgs = new ArrayList<Message>();
Message[] tempMsgs = inbox.getMessages(inbox.getMessageCount() - 10, inbox.getMessageCount());
for (Message tempMsg : tempMsgs) {
if (!tempMsg.isSet(Flags.Flag.SEEN)) {
msgs.add(tempMsg);
tempMsg.setFlag(Flags.Flag.SEEN, true);
}
}
onMailReceived(msgs.toArray());
LOGGER.info("[MailRX " + randomInstanceID + "] Forwarded " + msgs.size() + " E-Mails.");
return msgs.toArray();
}
return null;
}
public void onMailReceived(Object[] messages) {
// Overwrite me
}
/**
* Return the primary text content of the message.</br>
* Source: https://javaee.github.io/javamail/FAQ#mainbody
*
* @param p The message
* @return The message's main body text content
* @throws MessagingException If something went wrong
* @throws IOException If something else went wrong
*/
public String getTextFromMessage(Part p) throws MessagingException, IOException {
if (p.isMimeType("text/*")) {
String s = (String) p.getContent();
return s;
}
if (p.isMimeType("multipart/alternative")) {
Multipart mp = (Multipart) p.getContent();
String text = null;
for (int i = 0; i < mp.getCount(); i++) {
Part bp = mp.getBodyPart(i);
if (bp.isMimeType("text/plain")) {
if (text == null)
text = getTextFromMessage(bp);
continue;
} else if (bp.isMimeType("text/html")) {
String s = getTextFromMessage(bp);
if (s != null)
return s;
} else {
return getTextFromMessage(bp);
}
}
return text;
} else if (p.isMimeType("multipart/*")) {
Multipart mp = (Multipart) p.getContent();
for (int i = 0; i < mp.getCount(); i++) {
String s = getTextFromMessage(mp.getBodyPart(i));
if (s != null)
return s;
}
}
return null;
}
public void stop() {
this.stopped = true;
}
}
Die EJB Spec "verbietet", dass Du selbst Threads erstellst. Natürlich kann Dich die Spec nicht daran hindern, aber Du solltest keinesfalls aus einem solchen Thread Dinge verwenden, die unter der Fuchtel des Application Servers stehen. Außerdem ist es keine gute Idee, für jede Verbindung einen eigenen Thread zu verbraten.
Wenn Du trotzdem daran festhalten willst, würde mir spontan einfallen, eine LinkedBlockingQueue aus diesen Threads heraus zu füllen und diese Queue aus einem managed Thread (ManagedExecutorService) heraus zu lesen. Die EJB und Deine Threads teilen sich also eine Queue.
Nur als kleine Info am Rande: achte doch bitte darauf, deine Beiträge in einem halbwegs passenden Unterforum zu erstellen, dies hier ist ebenso wie deine anderen Beiträge alles andere als ein Anfängerthema
a) Was benötige ich dann noch von dem obigen Code? Muss ich das mit den Threads komplett löschen?
b) Was macht: Object obj = messages.take();
c) Wie kann ich einzelne Postfächer stoppen? (Watch stoppen) ?
d) Was ist mit: queue.offer() gemeint?
a) alles - es ging ja nur darum, eine Verbindung zwischen den von Dir und den vom Application Server verwalteten Threads herzustellen, um auf die Ressourcen (wie EJBs) zugreifen zu können. Diese Verbindung erfolgt über eine Queue. Daher:
Das Prinzip ist ganz einfach: Du hast einen Postfach-Thread. Der erhält irgendwann neue Nachrichten und schiebt diese in einen FIFO-Puffer (Queue). Auf der anderen Seite hast Du einen vom Application Server verwalteten Worker-Thread, der einfach eine Nachricht nach der anderen aus der Queue liest und damit irgendwas macht.
OK - also habe ich immernoch Threads, die dann aber vom Application Server bearbeitet werden...
Und wie sieht der Aufbau dann der Klassen aus? Welche Modifikation brauche ich in den oberen Klassen?
a) Wie starte ich denn den Listener, der prüft, ob neue Email da sind oder nicht?
b) Im Idealfall rufe ich dann in der public void onMailReceived(Object[] messages) eine Methode via EJB auf, die dann weiteres macht (Ticket erstellen aus den Infos der Message)
Demnach bräuchte ich noch eine Instanzierung in public class MailPushEventHandler extends MailPushEmpfaenger ?
c) Wie muss doSomethingWith(obj); abgeändert werden?
a) Das passiert bei der Erzeugung Deines Handlers doch automatisch
b) Nein, im Idealfall rufst Du in onMailReceived die offer-Methode der Queue auf.
c) doSomethingWith(obj) ist lediglich ein Platzhalter, den Du halt mit den für Dich passenden Anweisungen ersetzen musst.
Das habe ich in #16 bereits beschrieben. Der Code aus #16 muss in Deine EJB (TicketEmailWatcherService). Bei der Erzeugung des MailPushEventHandlers musst Du die messages-Queue mitgeben (-> Anpassung des Konstruktors von MailPushEventHandler). Und in MailPushEventHandler#onMessageReceived legst Du das zu verarbeitende Objekte in die Queue mit der offer-Methode.
welche Klassen stecken denn hinter: private Queue<Object> messages; ?
Bekomme in Eclipse einen Fehler, dass er "take" nicht findet...
"The method take() is undefined for the type Queue<Object>"
List<TicketEmailSetting> list = ticketEmailSettingService
.findAllTicketEmailSettingListByStatusActive();
if (list == null || list.isEmpty())
return;
for (TicketEmailSetting ticketEmailSetting : list) {
Store store = connect(ticketEmailSetting.getEmailSetting());
MailPushEventHandler mailEventHandler = new MailPushEventHandler(store, messages);
}
gehört nicht in den Thread. Der Code dient ja dem Start der Überwachung.
Fast. Nimm den try-catch-Block (EDIT: der auf Ebene von initStartTicketListener) nach unten, dann ist messages auch schon initialisiert, wenn Du den Konstruktor von MailPushEventHandler aufrufst
Aktuell entnimmst Du nur das Objekt aus der Queue, machst aber damit noch nichts. Du könntest an der Stelle die Nachricht in der DB speichern, oder was auch immer Du damit anfangen willst.
Dann bleibt jetzt aber noch die Frage wie ich die Überwachung von einzelnen Postfächer stoppe?
In der TicketEmailWatcherService schwebt mir sowas vor wie:
public void stop(String emailAddress){
???
}
Was ich brauche ist eine eindeutige ID / Emailadresse, in der ich das "watching" dann eines MailPushEventHandler stoppe?
Läuft nun super. Vielen vielen Dank.
Muss ich den Handler noch initialisieren? Wenn ja, wie? Map<String, MailPushEventHandler> handlers
Ich habe noch ein Problem bei folgendem Code, welcher allerdings nur ein paar Mal auftritt.
Ich schätze, wenn ich die Email schon gelesen habe oder ähnliches.
Java:
if (inbox.getMessageCount() > 0) {
ArrayList<Message> msgs = new ArrayList<Message>();
Message[] tempMsgs = inbox.getMessages(inbox.getMessageCount() - 10, inbox.getMessageCount());
for (Message tempMsg : tempMsgs) {
if (!tempMsg.isSet(Flags.Flag.SEEN)) {
msgs.add(tempMsg);
tempMsg.setFlag(Flags.Flag.SEEN, true);
}
}
Exception kommt hier: for (Message tempMsg : tempMsgs) {
Folgende Exception wird geworfen: javax.mail.MessageRemovedException
D. h. dass Du nicht je Verbindung einen Thread hast, sondern ein Thread für zig Verbindungen gleichzeitig zuständig ist. Der IdleManager aus #2 arbeitet mit non-blocking I/O.
Wenn Du den IdleManager (s. #2) verwendest und keine Threads, bekommst Du non-blocking I/O gratis. Vielleicht habe ich Dich auch missverstanden und Du willst wissen, wie man selbst mit non-blocking I/O verwendet. Unter http://tutorials.jenkov.com/java-nio/index.html scheint es ein Tutorial dazu zu geben.
ja, ich verwende den IdleManager.
Dann passt es doch schon bereits? Oder verstehe ich dich jetzt falsch?
Die Frage, die ich mir eben stelle ist, wenn ich bspw. 1000 Emailadressen überwache, ob ich Probleme bekomme im Server.
Dass es dann vllt zu einer Verzögerung bei der Überwachung kommt, ist dann verständlich.
Die Frage ist dann eben, ob mehr Server (mehr Kapazität, mehr Threads) dies dann ausgleichen?
Die Frage, die ich mir eben stelle ist, wenn ich bspw. 1000 Emailadressen überwache, ob ich Probleme bekomme im Server.
Dass es dann vllt zu einer Verzögerung bei der Überwachung kommt, ist dann verständlich.
Die Frage ist dann eben, ob mehr Server (mehr Kapazität, mehr Threads) dies dann ausgleichen?
Habe hierzu nochmal eine allgemeine Frage.
Ich würde gerne in meiner Applikation einen "Scheduler" haben. Also um konkret zu sein:
- Ich habe eine DB - Tabelle "MySchedules". Hier steht dann das "plannedExecutionDate".
Im Moment habe ich eine Methode mit @Schedule annotiert, die dann jede Minute läuft.
Hier wird geprüft, ob es in der DB Einträge in der "MySchedules" gibt, die noch nicht losgelaufen sind und das "plannedExecutionDate" kleiner als die jetztige Uhrzeit ist.
Wenn das zutrifft, dann wird das Objekt von "MySchedules" in eine Liste von MySchedules hinzugefügt und anschließend eine entsprechende Methode aufgerufen (Kunde erstellen whatever):
Java:
@Timeout
@AccessTimeout(value = 20, unit = TimeUnit.MINUTES)
@Schedule(minute = "*/1", hour = "*", persistent = false)
public void execute() throws Exception {
LOGGER.info("START SCHEDULER");
try {
if (currentlyRunning) {
LOGGER.warn("Scheduler is currently running...");
return;
}
if (schedulingService == null)
schedulingService = new SchedulingService();
currentlyRunning = true;
/**
* Alle noch nicht ausgeführten Schedules bekommen
*/
List<ScheduleExecution> scheduleExecutionList = scheduleExecutionService
.findAllScheduleExecutionOutstanding();
if (scheduleExecutionList == null || scheduleExecutionList.size() == 0) {
LOGGER.info("NO PLANNED SCHEDULES FOUND");
currentlyRunning = false;
return;
}
LOGGER.info("FOUND SCHEDULES: " + scheduleExecutionList.size());
for (ScheduleExecution scheduleExecution : scheduleExecutionList) {
synchronized (this) {
schedulingService.addTaskToExecutor(taskId, new Runnable() {
@Override
public void run() {
try {
// executeAction(scheduleExecution);
} catch (Execption e) {
LOGGER.error(ExceptionUtils.getFullStackTrace(e));
}
}
}, 0);
}
}
currentlyRunning = false;
}
catch (Exception e) {
LOGGER.error(ExceptionUtils.getFullStackTrace(e));
currentlyRunning = false;
}
LOGGER.info("END SCHEDULER");
}
Meine Klasse "SchedulingService" sieht so aus:
Java:
public class SchedulingService {
private static Map<String, ScheduledFuture> scheduleFutures = new HashMap<>();
private ScheduledExecutorService scheduledExecutorService = null;
private int executorPoolSize = 5;
/**
*
* Will create ScheduledService Object
*
*/
public void createScheduledExecutor(int executorPoolSize) {
scheduledExecutorService = Executors.newScheduledThreadPool(executorPoolSize);
addRecurringTaskToExecutor("removeStopedScheduledFuturesFromMap", new Runnable() {
@Override
public void run() {
List<String> tasksToBeRemoved = new ArrayList<>();
synchronized (scheduleFutures) {
scheduleFutures.keySet().stream().filter(task -> {
return scheduleFutures.get(task).isDone();
}).forEachOrdered(task -> {
tasksToBeRemoved.add(task);
});
tasksToBeRemoved.stream().forEach(key -> {
scheduleFutures.remove(key);
});
}
}
}, 0, 10);
}
/**
*
* Will Stop the ScheduledService
*
*/
public void stopScheduledExecutor() {
scheduledExecutorService.shutdown();
}
/**
*
* return ScheduledService Object if exists otherwise first create the object
* then return
*
*/
public ScheduledExecutorService getExecutorServiceObject() {
if (scheduledExecutorService == null) {
createScheduledExecutor(this.executorPoolSize);
}
return scheduledExecutorService;
}
/**
*
* Add new tasks to executors
*
*/
public void addRecurringTaskToExecutor(String taskID, Runnable event, int initialDelay, int periodicDelay) {
synchronized (scheduleFutures) {
System.out.println("Scheduling new task : " + taskID);
scheduleFutures.put(taskID, getExecutorServiceObject().scheduleAtFixedRate(event, initialDelay,
periodicDelay, TimeUnit.SECONDS));
}
}
public void addTaskToExecutor(String taskID, Runnable event, int initialDelay) {
synchronized (scheduleFutures) {
System.out.println("Scheduling new task : " + taskID);
scheduleFutures.put(taskID, getExecutorServiceObject().schedule(event, initialDelay, TimeUnit.SECONDS));
}
}
public void addTaskToExecutor(String taskID, Runnable event) {
addTaskToExecutor(taskID, event, 0);
}
/**
*
* Will remove tasks from scheduler
*
*/
public void removeTaskFromExecutor(String taskID, boolean forceStop) {
synchronized (scheduleFutures) {
System.out.println("Stopping " + taskID);
if (scheduleFutures.containsKey(taskID))
scheduleFutures.get(taskID).cancel(forceStop);
else
System.out.println(taskID + " Already Stopped");
}
}
}
Nun meine Frage, macht das Sinn?
Weil du vorher was geschrieben hast, dass man nicht selbst Threads anlegen soll?