Hallo!
Ich bekomme über eine Methode Werte (OSGi Plugin) in meinen Code, welchen ich in eine InfluxDB übers Web schreibe.
Das schreiben soll alle 30s erfolgen.
Nun habe ich das Problem, wenn ein Internetausfall ist, die Queue an die 26.000 Einträge hat und das Programm dann blockt. Ich vermute das das Hochladen zu lange dauert und die ankommenden neuen Daten einen Fehler verursachen.
Vermutlich ist mein ganzer Ablauf nicht korrekt und Threadsicher (oder wie man das beschreibt)
Erstellen der synchronizedList und die Methode, die die List füllt.
Der Aufruf zum hochladen in einem eigenen Thread:
Die Uploadklasse selbst
Ich bekomme über eine Methode Werte (OSGi Plugin) in meinen Code, welchen ich in eine InfluxDB übers Web schreibe.
Das schreiben soll alle 30s erfolgen.
Nun habe ich das Problem, wenn ein Internetausfall ist, die Queue an die 26.000 Einträge hat und das Programm dann blockt. Ich vermute das das Hochladen zu lange dauert und die ankommenden neuen Daten einen Fehler verursachen.
Vermutlich ist mein ganzer Ablauf nicht korrekt und Threadsicher (oder wie man das beschreibt)
Erstellen der synchronizedList und die Methode, die die List füllt.
Java:
private List<Point> list = Collections.synchronizedList(new ArrayList<Point>());
...
public void onData(double value, String measurement, String tags, String field, long timestamp) {
if (getOutputState() == ENUM_STATE_RUNNING) {
synchronized (list) {
Map<String, String> tag = new HashMap<>();
for (String tagElement : tags.split(",")) {
tag.put(tagElement.split(":")[0], tagElement.split(":")[1]);
}
Instant time = Instant.ofEpochSecond(timestamp);
Point point = Point.measurement(measurement).addTags(tag).addField(field, value).time(time,
WritePrecision.S);
list.add(point);
updateOutputQueuesize(list.size());
}
}
Der Aufruf zum hochladen in einem eigenen Thread:
Code:
public void upload() {
synchronized (list) {
logger.debug("Queue size: " + list.size());
if (list.size() != 0) {
logger.debug("Start Thread Upload");
Runnable r = new Upload(client, getPropertyBucket(), getPropertyOrganisation(), list);
// list.clear();
new Thread(r).start();
logger.debug("Start Thread Upload: DONE");
updateOutputQueuesize(list.size());
}
}
}
Die Uploadklasse selbst
Code:
public class Upload implements Runnable {
private InfluxDBClient client = null;
private String bucket;
private String org;
private List<Point> points;
private Logger logger = (Logger) LoggerFactory.getLogger(this.getClass());
public Upload(InfluxDBClient client, String bucket, String org, List<Point> points) {
this.bucket = bucket;
this.org = org;
this.client = client;
this.points = points;
this.logger.setLevel(Level.INFO);
}
@Override
public void run() {
try {
logger.debug("create InfluxDB Data: " + points.toString());
WriteApiBlocking writeApi = client.getWriteApiBlocking();
logger.debug("upload InfluxDB Data");
writeApi.writePoints(bucket, org, points);
logger.debug("upload InfluxDB Data: DONE");
points.clear();
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
}
}
public List<Point> getValue() {
return points;
}
}