# häppchenweises auslesen "lazy loading" von Datenbank



## beginner99 (10. Nov 2010)

Hallo zusammen,

habe mal wieder eine Frage.
Man siehe dazu auch thread und post

http://www.java-forum.org/allgemein...lele-multithreaded-iteration-ueber-map.html#7

Was das Ziel ist:


Alle Zeilen aus Datenbank auslesen
Anhand davon für jede Zeile etwas berechnen
resultat in Datenbank zurückschreiben

Die "Berechnung" ist extrem zeitintensiv und deshalb ist es zwingend nötig, das Ergebnis zu speichern.
Das ganze sollte am besten Multithreaded ausgeführt werden.

Problem:
Momentan lese ich alles auf einmal und das führt sehr schnell zu out of memory exceptions, zB. mit 60k records muss man  -Xmx1536m setzen.

Ich habe eine Klasse, welche die Berechnung durchführt und eine die für den Datembankzugriff verantwortlich ist. Letztere gibt eine Map<Integer,Molecule> zurück mit allen einträgen. Wie könnte ich das jetzt häppchenweise zurückgeben?

Erwähnt wurde eine Queue aber die müsste die Daten aus der Datenbank gefüttert bekommen.

Ideen?


----------



## fastjack (10. Nov 2010)

Wenn die Daten eine ID, oder irgendeine hochgezählte Nummerierung, haben, könntest Du immer 100, oder 1000 Datensätze selektieren und verarbeiten (where id >= 0 and id < 100 oder id >= 100 < 200 etc.) irgendwie so vielleicht.


----------



## beginner99 (10. Nov 2010)

fastjack hat gesagt.:


> Wenn die Daten eine ID, oder irgendeine hochgezählte Nummerierung, haben, könntest Du immer 100, oder 1000 Datensätze selektieren und verarbeiten (where id >= 0 and id < 100 oder id >= 100 < 200 etc.) irgendwie so vielleicht.



ne id ist nicht zwingend vorhanden. Die Idee des fertigen produktes ist, dass jemand das dann auf seinen vorhandenen Datensatz anwenden kann und was der für Id's hat, kann man nicht wissen. Man könnte das einfach verlangen, dass so ne Spalte erstellt wird, wenn ncith vorhanden, nicht ideal aber auch nicht wirklich problematisch.
(bei oracle wärs einfach mit rowid...)


----------



## beginner99 (11. Nov 2010)

Hatte folgende Idee, wie ich das Problem angehen könnte:

1. Thread erstellen, der die Daten aus der Datenbank liest
2. die Daten kommen in eine Queue mit begrenzter Grösse
3. Brechenungsthreads die von der queue lesen
4. Berechnungsthreads schreiben Resultat in eine Resultatsqueue ohne limit
5. Thread der Daten aus Resultatsqueue in die Datenbank schreibt

Das Problem ist dabei noch, wie ich die 2 Objekte (id, Molecule) in die Queue bringe. jedes mal eine Map mit genau 1 eintrag erstellen?
Und da kommt natürlich gleich die Idee beim schreiben, ich kann ja Maps mit zb. 1000 einträgen in die Queue übergeben.

Gute idee? oder zu komplex?


----------



## maki (11. Nov 2010)

ConcurrentLinkedQueue, BlockingQueue und andere Queue Implementierungen aus dem package [c]java.util.concurrent [/c] solltest du dir imho ansehen.


----------



## beginner99 (11. Nov 2010)

maki hat gesagt.:


> ConcurrentLinkedQueue, BlockingQueue und andere Queue Implementierungen aus dem package [c]java.util.concurrent [/c] solltest du dir imho ansehen.



Ja bin dabei. Ein Problem das ich habe, ist was ich genau mit den InterruptedExceptions anfangen soll.

catchen wo geworfen? oder propagieren? Im letzteren Fall muss man als Callable verwenden anstatt thread oder runnable + Executor service, da man die run() methode nicht mit throws erweitern kann.

Momentan funktioniert es nicht. Im test läuft es zwar durch ohne Exception/fehler aber es passiert nichts, weshalb der test auch scheitert.

Später ev. mehr mit code.


----------



## beginner99 (12. Nov 2010)

WARNUNG: Monster-Post.

Bin am Ende meines Lateins. Ich weiss nicht, wie ich das Problem erklären kann, ohne relativ viel code zu posten.
Problem 1: Im Unit Test blockiert der Code manchmal und manchmal nicht.
Problem 2: Die relevanten Stellen, wo die Arbeit passieren sollte, werden nie erreicht -> Etwas mit Bedingungen stimmt nicht
Ich benutze dbunit + HSQLDB

Code Abschnitt 1:

Ruft dataaccesslayer auf, welcher dann die Daten in einzelnen Maps<Integer, IMolecule> mit Size 1000 an die Queue übergeben sollte.
Danach wird etwas berechnet und sollte mit dem DAL und einer weiteren Queue in die Datenbank zurückgeschrieben werden (soweit kommt es nicht, da nie etwas gelesen wird).

Ich weiss nicht, wie ich die Bedingungen setzten soll, ob der "main thread" weiter versuchen soll von der queue zu lesen. Momentan sollte gelesen werden solange es Elemente in der Queue hat oder der lese-thread aktiv ist. Macht Sinn nur funktioniert es nicht.


```
public int createFingerprints() throws SQLException {

        logger.entry();
        Profiler profiler = new Profiler("Create Fingerprints");
        profiler.setLogger(logger);
        profiler.start("Initialize");
        fingerprints = new HashMap<Integer, Fingerprint>();
        ExecutorService executor =
                Executors.newCachedThreadPool(Executors.defaultThreadFactory());
        final LinkedBlockingQueue<Map<Integer, IMolecule>> readingQueue =
                new LinkedBlockingQueue(5);
        final LinkedBlockingQueue<Fingerprint> savingQueue =
                new LinkedBlockingQueue(10);

        Callable readingCallable = new Callable() {

            @Override
            public Integer call() throws SQLException, InterruptedException {

                return getDataAccessLayer().getAllMolecules(readingQueue);

            }
        };

        Callable savingCallable = new Callable() {

            @Override
            public Integer call() throws SQLException, InterruptedException {

                return getDataAccessLayer().saveFingerprints(savingQueue);

            }
        };

        Future<Integer> moleculesRead = executor.submit(readingCallable); //vermute, dass der thread nicht sofort aktiv wird?
        logger.debug("submited reading callable");
        Future<Integer> fingerprintsSaved = executor.submit(savingCallable);
        logger.debug("submited saving callable");
        Fingerprint fingerprint = null;
        profiler.start("Create and Save Fingerprints");
        logger.debug("readingQueue Size: {}", Integer.toString(readingQueue.size())); // immer 0
        logger.debug("Reading thread active: {}", moleculesRead.isDone()); // meistens true
       // loop wird oft nicht erreicht und wenn, dann waren bedingunen falsch im log oben
      // also 0 und false. Dann endet es in einer Endlosschleife (Abfrage der Queue)
        while (readingQueue.size() > 0 || !moleculesRead.isDone()) { 
            //retry reading if thread was interrupted
            Map<Integer, IMolecule> molecules = null;
            while (molecules == null) {
                try {
                    molecules = readingQueue.poll(100, TimeUnit.MILLISECONDS);
                    logger.debug("Polled reading Queue");
                } catch (InterruptedException ex) {
                    logger.catching(ex);
                    //TODO: proper exception handling
                }
            }
            for (Integer molId : molecules.keySet()) {
                try {
                    logger.debug("creating fingerprint for molecule with id:");
                    logger.debug("MolId: {}", Integer.toString(molId));
                    fingerprint = new Fingerprint(molId,
                            getFingerprinter().getFingerprint(molecules.get(molId)));
                    getFingerprints().put(molId, fingerprint);
                    while (!savingQueue.offer(fingerprint)) {
                        Thread.sleep(100);
                        logger.debug("sleeping: tried offering to savingQueue");
                    }

                } catch (CDKException cdkEx) {
                    getLogger().catching(cdkEx);
                } catch (InterruptedException ex) {
                    getLogger().catching(ex);
                    //TODO: proper exception handling
                }
            }
        }        
        fingerprintsSaved.cancel(true);
        executor.shutdownNow();
        profiler.stop().log();
        logger.debug("Fingerprints Map Size: {}", Integer.toString(fingerprints.size()));
        getLogger().exit(getFingerprints().size());
        return getFingerprints().size();
    }
```

Hier die Methode vom DAL. Auch hier wird nicht der relevante part des codes ausgeführt, im Gegenteil die Methode scheint nie zu Ende zu laufen, aber es gibt auch keinen Fehler.


```
public int getAllMolecules(LinkedBlockingQueue<Map<Integer, IMolecule>> queue)
            throws SQLException, InterruptedException{

        getLogger().entry(queue);
        Profiler profiler = new Profiler("Create Fingerprints");
        profiler.setLogger(getLogger());
        profiler.start("Create and Execute Statment");
        ResultSet resultSet = null;
        Map<Integer, IMolecule> molecules = new HashMap<Integer, IMolecule>();

        Statement stmt = getConnection().createStatement();
        getLogger().debug(getSELECT_ALL_MOLECULES_STATEMENT()); // wird geloggt
        resultSet.setFetchSize(getFetchSize());
        resultSet = stmt.executeQuery(getSELECT_ALL_MOLECULES_STATEMENT());
        SmilesParser smilesParser = new SmilesParser(DefaultChemObjectBuilder.getInstance());
        IMolecule mol;
        int counter = 0;
        profiler.start("Loop through Results");
        while (resultSet.next()) { // loop wird nie ausgeführt
            Integer id = resultSet.getInt(getMolIdColumnName());
            String smiles = resultSet.getString(getStructureColumnName());
            getLogger().debug(smiles); 
            try {
                mol = smilesParser.parseSmiles(smiles);
                molecules.put(id, mol);
                getLogger().debug("Counter: {}", Integer.toString(counter));
                counter++;
            } catch (InvalidSmilesException smilesEx) {
                getLogger().catching(smilesEx);
            } catch (CDKException cdkEx) {
                getLogger().catching(cdkEx);
            }
            getLogger().debug("molecule Map size: ", Integer.toString(molecules.size()));
            if (molecules.size() >= getFetchSize() || resultSet.isLast() ) {
                //getLogger().debug("Ready to offer Map to queue");
                while (!queue.offer(molecules)) {
                    try {
                        Thread.sleep(getReadWaitTime());
                        getLogger().debug("sleeping for {}: could not offer to queue",
                                Integer.toString(getReadWaitTime()));
                    } catch (InterruptedException ex) {
                        stmt.close();
                        throw ex;
                    }
                }

                molecules = new HashMap<Integer, IMolecule>();
            }
        }
        profiler.stop().log(); // wird nicht ausgeführt! Nicht sichtbar im log
        getLogger().exit(counter); 
        return counter; 
    }
```

Das ganze ist ein wenig verwirrend. Ev. sollte ich zurück zu einer Implementirerung mit threads gehen und ohne executor. Das Problem ist dann, dass ich in run keine SQLExceptions weitergeben kann.
(gut, richtiges exception handling ist momentan nicht wirklich existent).

Die ursprüngliche Version, die gut funktioniert für kleine Datensätze, funktioniert noch (auch inder gleichen Testklasse). Es kann also eigentlich nicht am testDatenset liegen.


----------



## beginner99 (12. Nov 2010)

Habe die Fehler gefunden. Eigentlich nur triviales. Das Hauptproblem ist aber im obigen Code, dass die Exceptions in den Threads nicht zum Hauptthread zurück propagiert werden. Deshalb stürtzen die einfach ab ohne Meldung.

IMHO funktioniert das also noch nicht so richtig oder ich mach grundlegend was falsch.

zb. Dieser Ausschnitt mit throws:

```
Callable readingCallable = new Callable() {

            @Override
            public Integer call() throws SQLException, InterruptedException {

                return getDataAccessLayer().getAllMolecules(readingQueue);

            }
        };
```
InterruptedException wird nie gecatch oder geworfen. Im normallfall sollte es also ein compiler fehler geben, was aber nicht passiert. Liegt ev. auch and der anonymen Klasse? Wechseln zu einer normalen inneren Klasse?


----------



## beginner99 (17. Nov 2010)

Hier noch ein paar Kommentare:

Mit JDBC ist es nur bedingt möglich die Kontrolle zu haben, über wie viele Daten ausgelesen werden, da setFetchSize() nicht in allen jdbcDrivern auch etwas tut. Ein Beispiel ist hsqldb, dass immer alle Resultate auf einmal ausliest. Deshalb läuft man sehr schnell in memory Probleme, wenn man alle rows selektiert.
->Problem ist also der hsqldb driver selbst, kann aber auch bei anderen DB drivers auftreten
Obiger approach reduziert den memory Verbrauch dennoch um rund 1/3.

Die Frage ist, ob es Alternativen gibt (Spring, Hibernate,...) wo man das besser kontrollieren kann? Andererseits für meine Zwecke wäre zb. Hibernate bisschen übertrieben, da ich das ORM genau für 1 ObjektTyp brauchen würde, der aus 2 Feldern besteht.

Der pragmatische Ansatz wäre, dass man eine id beginnend mit 1 und mit fortlaufender Nummer verlangt und dann halt anhand dieser ausliest.

Komplizierter wäre es zuerst nur alle ids zu selektieren und weiterzugeben und dann in erst in den Berechnungsthreads die richtigen Daten zu selektieren. Dann hätte man immer nur den Teil im Speicher, der gerade benötigt wird.

Eure Meinungen?


----------



## maki (17. Nov 2010)

> Mit JDBC ist es nur bedingt möglich die Kontrolle zu haben, über wie viele Daten ausgelesen werden, da setFetchSize() nicht in allen jdbcDrivern auch etwas tut. Ein Beispiel ist hsqldb, dass immer alle Resultate auf einmal ausliest. Deshalb läuft man sehr schnell in memory Probleme, wenn man alle rows selektiert.


Das Problem ist weniger JDBC und mehr die DB bzw. wie sie das umsetzt, bei hsqldb heissen die SQL Befehle die du suchst LIMIT und OFFSET bei hsqldb:
Chapter*9.*SQL Syntax


----------



## beginner99 (17. Nov 2010)

maki hat gesagt.:


> Das Problem ist weniger JDBC und mehr die DB bzw. wie sie das umsetzt, bei hsqldb heissen die SQL Befehle die du suchst LIMIT und OFFSET bei hsqldb:
> Chapter*9.*SQL Syntax



Ja, hsqldb hat kein server seitigen cursor und muss deshalb laut Doku immer das ganze resultSet in den Speicher laden.
Problem ist, dass ich das RDBMS unabhängig haben will und hsqldb spezifische Befehle deshalb nicht geeignet sind. Ich brauche hsqldb zum testen, weil es praktisch ist.


----------



## maki (17. Nov 2010)

> Problem ist, dass ich das RDBMS unabhängig haben will und hsqldb spezifische Befehle deshalb nicht geeignet sind.


Dann ist entweder SQL die falsche Wahl, oder du musst für jedes RDBMS andere SQL Befehle verwenden, und somit für jedes zu verwendende RDBMS Queries bereitstellen.

Welche Prod. RDBMS soll denn verwendet werden? (falls es ein bestimmtes Prod. RDBMS gibt, ansonsten sieht es wie gesagt schlecht aus)


----------



## beginner99 (17. Nov 2010)

maki hat gesagt.:


> Dann ist entweder SQL die falsche Wahl, oder du musst für jedes RDBMS andere SQL Befehle verwenden, und somit für jedes zu verwendende RDBMS Queries bereitstellen.
> 
> Welche Prod. RDBMS soll denn verwendet werden? (falls es ein bestimmtes Prod. RDBMS gibt, ansonsten sieht es wie gesagt schlecht aus)



Theoretisch sollte es mit jedem RDBMS gehen, da es eine art library ist (suche nach chemischen Strukturen in einer entsprechenden Datenbank).
Aber da ich das nicht kommerzialisieren will oder so kann ich auch sämtliche Limitationen "weitergeben" falls es nicht anders geht. (andererseits hat ja jeder 0815 laptop 4 Gb Ram) Nur falls jemand eine idee hat, wäre ich auch dankbar.

Frage: Wie macht das zB Hibernate? 
Wenn ich Hibernate + Hsqldb + maxResults verwende, werden dann auch nur maxResults and Rows geladen?


----------



## maki (17. Nov 2010)

> ]Wie macht das zB Hibernate?


Da konfiguriert man den sog. SQL Dialekt, also spezialisiert sich auf das konkrete RDBMS


----------

