import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Puffer für eine große Anzahl binärer Daten.<br>
* Die Daten werden auf Datenblocks aufgeteilt, die in einer Queue liegen.
* Ist ein Datenblock voll, wird ein weiterer angelegt. Leseoperationen bewirken, dass ein Datenblock peu à peu "leergelesen"
* wird. Leere Datenblocks werden aus der Liste entfernt.<br>
* Geht der verfügbare Arbeitsspeicher zur Neige, werden Datenblocks, beginnend vom Tail der Queue an, ausgelagert, bis ihre
* Daten gebraucht werden.<br>
* Nur vollbeschriebene Datenblocks werden ausgelagert.
*/
public class BigBuffer {
/**
* Enthält einen Teil der Daten. Kann diese auslagern und wieder einholen.
*/
private static class DataBlock {
private volatile byte [] data;
private volatile int capacity; // Die Kapazität kann man zwar durch data.length ermitteln,
// wenn jedoch die Daten ausgelagert sind, ist data null
// und wir wollen uns beim Wiedereinholen nicht auf die
// Länge der Auslagerungsdatei verlassen
private volatile int writepos, readpos, size; // Schreib/Leseposition, aktuelle Größe der Nutzdaten
private volatile boolean externalized; // true, wenn die Daten ausgelagert wurden
private volatile File file; // Auslagerungsdatei
public DataBlock(int capacity) {
this.capacity = capacity;
data = new byte[capacity];
writepos = 0; readpos = 0; size = 0;
externalized = false;
}
/**
* Ein Block kann nur ausgelagert werden, wenn er voll beschrieben ist und noch kein bißchen
* ausgelesen wurde.
*/
public boolean isExternalizeable() {
return !externalized && isCompletelyFilled() && readpos == 0;
}
/**
* Lagert die Daten aus. Liefert false, wenn fehlgeschlagen.
*/
public void externalize() throws IOException {
File F = File.createTempFile("audiobuffer", ".raw");
try (FileChannel channel = FileChannel.open(F.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(data));
file = F;
externalized = true;
data = null;
}
}
/**
* Schreibt Daten in den Block. Es kann passieren, dass der Datenblock die Daten nicht
* komplett aufnehmen kann (weil seine Kapazität erschöpft ist).
* Die Methode liefert die Anzahl Bytes zurück, die geschrieben wurden.
*/
public int offer(byte [] srcBuffer, int offset, int len) {
int rest = data.length - writepos; // Wieviel Daten passen in diesen Block überhaupt noch hinein?
int actualLen = Math.min(len, rest);
System.arraycopy(srcBuffer, offset, data, writepos, actualLen);
writepos += actualLen;
size += actualLen;
return actualLen;
}
/**
* Liest Daten aus dem Block. Wenn nicht soviel Daten vorhanden sind, wie gewünscht, werden nur soviel Daten geliefert,
* wie vorhanden. Die Anzahl der gelesenen Daten wird zurückgeliefert.
*/
public int read(byte [] destBuffer, int offset, int len) throws IOException {
if (externalized) {
internalize();
}
int actualLen = Math.min(len, size);
System.arraycopy(data, readpos, destBuffer, offset, actualLen);
readpos += actualLen;
size -= actualLen;
return actualLen;
}
private void internalize() throws IOException {
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
data = new byte[capacity];
int actuallyRead = channel.read(ByteBuffer.wrap(data));
if (actuallyRead != capacity) {
throw new IOException("invalid buffer file: could not read all data");
}
externalized = false;
}
}
/**
* Bytes ohne Einzulesen verwerfen. Liefert die tatsächlich verworfenen Bytes zurück. Das
* können weniger als die angeforderten bytes sein.
*
* Wenn der Datenblock ausgelagert ist und die zu verwerfenden Bytes mindestens der Länge des
* Datenblocks entsprechen, kann die Auslagerungsdatei direkt gelöscht werden. Ist die Anzahl
* zu verwerfender Bytes kleiner als die Datenlänge, müssen die Daten ggf. erst eingeholt werden.
*/
public long discard(long bytes) throws IOException {
if (externalized && bytes >= capacity) {
dispose();
return capacity;
} else {
internalize();
long actualLen = Math.min(bytes, size);
readpos += actualLen;
size -= actualLen;
return actualLen;
}
}
/**
* Liefert true, wenn der Datenblock leergelesen ist. Leergelesen meint nicht, wenn die Nutzdatenlänge 0 ist,
* sondern wenn die Leseposition am Ende dieses Blocks angekommen ist.
*/
public boolean beenReadCompletely() {
return readpos >= capacity;
}
/**
* Liefert true, wenn der Datenblock keine Daten mehr aufnehmen kann.
* @return
*/
public boolean isCompletelyFilled() {
return writepos >= capacity;
}
public void dispose() {
if (file != null && file.exists()) {
file.delete();
}
}
/**
* Convenience-Method. Fragt auch null ab.
*/
public static boolean isExternalizeable(DataBlock block) {
return block != null && block.isExternalizeable();
}
}
/**
* Die Queue bietet einen Zugriff auf den Tail. Außerdem überschreib sie clear, damit die DataBlocks ggf. ihre
* Auslagerungsdateien löschen können.
*/
private static class DataBlockQueue extends ConcurrentLinkedQueue<DataBlock> {
private DataBlock tail = null;
@Override
public boolean offer(DataBlock e) {
tail = e;
return super.offer(e);
}
public DataBlock getTail() {
return tail;
}
@Override
public void clear() {
DataBlock block;
while ((block = poll()) != null) {
block.dispose();
}
super.clear(); // pro Forma. Eigentlich nicht notwendig, super.clear() poll() aufruft, bis null
}
}
private DataBlockQueue queue;
private volatile long currentSize;
private final long capacity;
private int dataBlockSize;
private final static long MEMORYTHRESHOLD = 1024*1024*300; // wenn weniger als soviel Speicher frei, dann auslagern
public BigBuffer(long capacity, int dataBlockSize) {
this.capacity = capacity;
this.dataBlockSize = dataBlockSize;
currentSize = 0;
queue = createQueue();
}
private DataBlockQueue createQueue() {
return new DataBlockQueue();
}
private boolean notEnoughMemory() {
return Runtime.getRuntime().freeMemory() < MEMORYTHRESHOLD;
}
/**
* Fügt len Daten aus srcBuffer ab offset
* am Tail des Puffers an.
* @param buffer Quelle der Daten
* @param offset offset innerhalb des srcBuffer
* @param len Anzahl bytes aus dem srcBuffer
* @throws IOException
*/
public void offer(byte [] srcBuffer, int offset, int len) throws IOException {
if (len + offset > srcBuffer.length) { throw new IllegalArgumentException(String.format("%d (len) + %d (offset) exceeding buffer's length (%d)", len, offset, srcBuffer.length)); }
if (len > availableCapacity()) { throw new IllegalArgumentException(String.format("Data too large. Remaining capacity: %d bytes, data length: %d bytes", availableCapacity(), len)); }
int resultLen = 0;
int usingLen = len;
do {
DataBlock block = queue.getTail();
if (block == null || block.isCompletelyFilled()) { // war die queue noch leer oder ist der Block schon gefüllt
if (DataBlock.isExternalizeable(block) && notEnoughMemory()) {
block.externalize();
}
queue.offer(block = new DataBlock(dataBlockSize)); // neuen Block erzeugen und anhängen
}
int actualLen = block.offer(srcBuffer, offset, usingLen);
resultLen += actualLen;
offset += actualLen; // sollte es einen weiteren Schleifendurchgang geben, muss der offset verschoben werden
usingLen -= actualLen; // und die noch zu schreibende Datenlänge entspr. verringert
} while (resultLen < len);
currentSize += resultLen;
}
/**
* Holt len Daten vom Kopf des Puffers und schreibt sie in den destBuffer ab dem gegebenen offset.
* Die tatsächliche Länge kann niedriger sein als len, wenn weniger Daten zur Verfügung stehen. Kann auch 0 sein.
* Die Anzahl gelesener Bytes wird zurückgeliefert.<br>
* Liefert -1, wenn beim Lesen der Daten ein Fehler auftrat.
*/
public int read(byte [] destBuffer, int offset, int len) throws IOException {
if (len + offset > destBuffer.length) { throw new IllegalArgumentException(String.format("%d (len) + %d (offset) exceeding buffer's length (%d)", len, offset, destBuffer.length)); }
int resultLen = 0;
int usingLen = len;
do {
DataBlock block = queue.peek(); // den Kopf holen, aber noch in der Schlange lassen, da er beim nächsten read evtl. noch gebraucht wird
if (block == null) break; // gibt es keine weiteren Daten, dann Ende
int actualLen = block.read(destBuffer, offset, usingLen);
resultLen += actualLen;
offset += actualLen; // sollte es einen weiteren Schleifendurchgang geben, muss der offset verschoben
usingLen -= actualLen; // und die noch zu lesende Datenmenge entspr. verringert werden
currentSize -= resultLen;
if (block.beenReadCompletely()) { // wenn der Block leergelesen ist, aus der Schlange entfernen
queue.poll();
block.dispose();
} else {
break; // wenn der Block nicht leergelesen ist, dann stehen offenbar keine weiteren Daten mehr zur Verfügung und wir müssen ausbrechen
}
} while (resultLen < len); // konnte der Datenblock noch nicht soviel Daten zur Verfügung stellen wie gewünscht, dann neuer Durchgang mit dem nächsten Block
return resultLen;
}
public void clear() {
currentSize = 0;
Queue<?> oldQueue = queue;
// da das Leeren der Queue etwas teuer ist, wird eine neue,
// leere Queue erzeugt und die alte im Hintergrund geleert. Das kann man nicht dem
// GarbageCollector überlassen, da evtl. ausgelagerte Blöcke auch von der Harddisk
// entfernt werden müssen.
queue = createQueue();
new Thread(() -> oldQueue.clear()).start();
}
public long capacity() {
return capacity;
}
public long availableCapacity() {
return capacity - currentSize;
}
/**
* Verwirft die gegebene Anzahl Bytes, ohne sie zu lesen.
* Liefert zurück, wieviel Bytes tatsächlich verworfen wurden. Das kann weniger sein.
*/
public long discard(long bytes) throws IOException {
long resultLen = 0;
long usingLen = bytes;
do {
DataBlock block = queue.peek(); // den Kopf holen, aber noch in der Schlange lassen, da er beim nächsten read evtl. noch gebraucht wird
if (block == null) break; // gibt es keine weiteren Daten, dann Ende
long actualLen = block.discard(usingLen);
resultLen += actualLen;
usingLen -= actualLen;
currentSize -= resultLen;
if (block.beenReadCompletely()) { // wenn der Block leergelesen ist, aus der Schlange entfernen
queue.poll();
block.dispose(); // ggf. die Auslagerungsdatei löschen.
} else {
break; // wenn der Block nicht leergelesen ist, dann stehen offenbar keine weiteren Daten mehr zur Verfügung und wir müssen ausbrechen
}
} while (resultLen < bytes); // konnte der Datenblock noch nicht soviel Daten zur Verfügung stellen wie gewünscht, dann neuer Durchgang mit dem nächsten Block
return resultLen;
}
public long size() {
return currentSize;
}
}