fix buffer getting freed prematurely

This commit is contained in:
Quin Lynch 2023-06-09 15:02:37 -03:00
parent 9b8de019a6
commit 535bd54b23

View file

@ -229,7 +229,13 @@ public abstract class EdgeDBBinaryClient extends BaseEdgeDBClient {
return duplexer.duplexAndSync(args.toExecutePacket(), (result) -> {
switch (result.packet.getMessageType()) {
case DATA:
args.data.add(result.packet.as(Data.class));
var data = result.packet.as(Data.class);
assert data.payloadBuffer != null;
// retain the data buffer once, so it's available for the
// consumer of data, since after this duplex step, `Data` and
// its children (buffers) are freed.
data.payloadBuffer.retain();
args.data.add(data);
break;
case STATE_DATA_DESCRIPTION:
updateStateCodec(result, args);
@ -458,6 +464,9 @@ public abstract class EdgeDBBinaryClient extends BaseEdgeDBClient {
));
} catch (EdgeDBException | OperationNotSupportedException e) {
return CompletableFuture.failedFuture(e);
} finally {
// free the buffer
executeArgs.data.get(i).close();
}
}
@ -567,65 +576,72 @@ public abstract class EdgeDBBinaryClient extends BaseEdgeDBClient {
@SuppressWarnings("unchecked")
private <T> CompletionStage<T> deserializeSingleResult(Class<T> cls, ExecutionArguments args, Cardinality expected) {
switch (args.format) {
case JSON:
if(args.data.size() == 1) {
try {
return CompletableFuture.completedFuture(
Codec.deserializeFromBuffer(
(Codec<T>)args.codecs.outputCodec,
args.data.get(0).payloadBuffer,
this.codecContext
)
);
}
catch (Exception x) {
return CompletableFuture.failedFuture(x);
}
}
return CompletableFuture.completedFuture((T)"[]");
case BINARY:
switch (expected) {
case ONE:
case AT_MOST_ONE:
if(expected == Cardinality.ONE
? args.data.size() != 1
: args.data.size() > 1
) {
return CompletableFuture.failedFuture(
new ResultCardinalityMismatchException(
expected,
args.data.size() > 1
? Cardinality.MANY
: Cardinality.AT_MOST_ONE
try {
switch (args.format) {
case JSON:
if(args.data.size() == 1) {
try {
return CompletableFuture.completedFuture(
Codec.deserializeFromBuffer(
(Codec<T>)args.codecs.outputCodec,
args.data.get(0).payloadBuffer,
this.codecContext
)
);
}
catch (Exception x) {
return CompletableFuture.failedFuture(x);
}
}
try {
return CompletableFuture.completedFuture(
args.data.size() == 0
? null
: ObjectBuilder.buildResult(
return CompletableFuture.completedFuture((T)"[]");
case BINARY:
switch (expected) {
case ONE:
case AT_MOST_ONE:
if(expected == Cardinality.ONE
? args.data.size() != 1
: args.data.size() > 1
) {
return CompletableFuture.failedFuture(
new ResultCardinalityMismatchException(
expected,
args.data.size() > 1
? Cardinality.MANY
: Cardinality.AT_MOST_ONE
)
);
}
try {
return CompletableFuture.completedFuture(
args.data.size() == 0
? null
: ObjectBuilder.buildResult(
this,
args.codecs.outputCodec,
args.data.get(0),
cls
)
)
);
} catch (EdgeDBException | OperationNotSupportedException e) {
return CompletableFuture.failedFuture(e);
}
default:
return CompletableFuture.failedFuture(
new EdgeDBException("Unsupported cardinality result " + expected)
);
} catch (EdgeDBException | OperationNotSupportedException e) {
return CompletableFuture.failedFuture(e);
}
default:
return CompletableFuture.failedFuture(
new EdgeDBException("Unsupported cardinality result " + expected)
);
}
default:
return CompletableFuture.failedFuture(
new EdgeDBException("Unsupported IO format" + args.format)
);
}
default:
return CompletableFuture.failedFuture(
new EdgeDBException("Unsupported IO format" + args.format)
);
}
}
finally {
for(var data : args.data) {
data.close();
}
}
}