Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Varint support in serializers #143

Merged
merged 7 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

public abstract class AbstractBytesOut implements AutoCloseable, BytesOut {

protected static final int MAX_VAR_INT_BYTES = 5;
private static final int MAXIMUM_RECURSION_DEPTH = SystemPropertyUtil.getInt("ulyp.recorder.max-recursion", 3);
private static final int MAX_STRING_LENGTH = SystemPropertyUtil.getInt("ulyp.recorder.max-string-length", 200);

Expand All @@ -32,19 +33,19 @@ public void write(String value) {
}

byte[] bytes = toPrint.getBytes(StandardCharsets.UTF_8);
write(bytes.length);
writeVarInt(bytes.length);
for (byte b : bytes) {
write(b);
}
} else {
write(-1);
writeVarInt(-1);
}
}

public void write(Object object, TypeResolver typeResolver) throws Exception {
try (BytesOut nestedOut = nest()) {
Type itemType = typeResolver.get(object);
write(itemType.getId());
writeVarInt(itemType.getId());
ObjectRecorder recorder;
if (object != null) {
// Simply stop recursively write objects if it's too deep
Expand Down
13 changes: 11 additions & 2 deletions ulyp-common/src/main/java/com/ulyp/core/bytes/BufferBytesOut.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public void write(int value) {
position += Integer.BYTES;
}

public void writeVarInt(int v) {
do {
int bits = v & 0x7F;
v >>>= 7;
byte b = (byte) (bits + ((v != 0) ? 0x80 : 0));
write(b);
} while (v != 0);
}

public void write(long value) {
buffer.putLong(position, value);
position += Long.BYTES;
Expand All @@ -91,13 +100,13 @@ public DirectBuffer copy() {

@Override
public void write(DirectBuffer buffer) {
write(buffer.capacity());
writeVarInt(buffer.capacity());
this.buffer.putBytes(position, buffer, 0, buffer.capacity());
position += buffer.capacity();
}

public void write(byte[] bytes) {
write(bytes.length);
writeVarInt(bytes.length);
buffer.putBytes(position, bytes);
position += bytes.length;
}
Expand Down
2 changes: 2 additions & 0 deletions ulyp-common/src/main/java/com/ulyp/core/bytes/BytesIn.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public interface BytesIn {

int readInt();

int readVarInt();

int readInt(int offset);

char readChar();
Expand Down
2 changes: 2 additions & 0 deletions ulyp-common/src/main/java/com/ulyp/core/bytes/BytesOut.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface BytesOut extends AutoCloseable {

void write(int value);

void writeVarInt(int v);

void write(long value);

void write(byte c);
Expand Down
37 changes: 34 additions & 3 deletions ulyp-common/src/main/java/com/ulyp/core/bytes/DirectBytesIn.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,37 @@ public int readInt() {
return val;
}

public int readVarInt() {
byte tmp;
if ((tmp = readByte()) >= 0) {
return tmp;
}
int result = tmp & 0x7f;
if ((tmp = readByte()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = readByte()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = readByte()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = readByte()) << 28;
while (tmp < 0) {
// We get into this loop only in the case of overflow.
// By doing this, we can call getVarInt() instead of
// getVarLong() when we only need an int.
tmp = readByte();
}
}
}
}
return result;
}

@Override
public int readInt(int offset) {
return buffer.getInt(offset);
Expand All @@ -72,7 +103,7 @@ public long readLong() {

@Override
public BytesIn readBytes() {
int length = readInt();
int length = readVarInt();
UnsafeBuffer newBuf = new UnsafeBuffer();
newBuf.wrap(buffer, pos, length);
pos += length;
Expand All @@ -88,7 +119,7 @@ public BytesIn readBytes(int offset, int length) {

@Override
public ObjectRecord readObject(ByIdTypeResolver typeResolver) {
Type itemClassType = typeResolver.getType(readInt());
Type itemClassType = typeResolver.getType(readVarInt());
ObjectRecorder recorder = ObjectRecorderRegistry.recorderForId(readByte());
return recorder.read(itemClassType, this, typeResolver);
}
Expand All @@ -110,7 +141,7 @@ public int readIntAt(int offset) {

@Override
public String readString() {
int length = readInt();
int length = readVarInt();
if (length >= 0) {
byte[] buf = new byte[length];
this.buffer.getBytes(pos, buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ public void write(int value) {
addWrittenBytes(Integer.BYTES);
}

@Override
public void writeVarInt(int v) {
// TODO 99..99999% writes hit same page, optimize this
do {
int bits = v & 0x7F;
v >>>= 7;
byte b = (byte) (bits + ((v != 0) ? 0x80 : 0));
write(b);
} while (v != 0);
}

public void write(long value) {
MemPage page = currentPage();
int remainingBytes = currentPageRemainingBytes();
Expand Down Expand Up @@ -179,7 +190,7 @@ public DirectBuffer copy() {
@Override
public void write(DirectBuffer buffer) {
int bytesLength = buffer.capacity();
write(bytesLength);
writeVarInt(bytesLength);
int offset = 0;
while (bytesLength > 0) {
MemPage page = currentPage();
Expand All @@ -194,7 +205,7 @@ public void write(DirectBuffer buffer) {

public void write(byte[] value) {
int bytesLength = value.length;
write(bytesLength);
writeVarInt(bytesLength);
int offset = 0;
while (bytesLength > 0) {
MemPage page = currentPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public void write(int value) {
bytesOut.write(value);
}

@Override
public void writeVarInt(int v) {
bytesOut.writeVarInt(v);
}

public void write(long value) {
bytesOut.write(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public boolean supportsAsyncRecording() {

@Override
public ObjectRecord read(@NotNull Type objectType, BytesIn input, ByIdTypeResolver typeResolver) {
return new ClassObjectRecord(objectType, typeResolver.getType(input.readInt()));
return new ClassObjectRecord(objectType, typeResolver.getType(input.readVarInt()));
}

@Override
public void write(Object object, BytesOut out, TypeResolver typeResolver) throws Exception {
Class<?> clazz = (Class<?>) object;

int typeId = typeResolver.get(clazz).getId();
out.write(typeId);
out.writeVarInt(typeId);

if (LoggingSettings.TRACE_ENABLED) {
log.trace("Writing typeId={} for {}", typeId, object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public boolean supports(Class<?> type) {
@Override
public ObjectRecord read(@NotNull Type type, BytesIn input, ByIdTypeResolver typeResolver) {
IdentityObjectRecord identityRecord = (IdentityObjectRecord) ObjectRecorderRegistry.IDENTITY_RECORDER.getInstance().read(type, input, typeResolver);
return new ByteArrayRecord(type, identityRecord, input.readInt());
return new ByteArrayRecord(type, identityRecord, input.readVarInt());
}

@Override
public void write(Object object, BytesOut out, TypeResolver typeResolver) throws Exception {
ObjectRecorderRegistry.IDENTITY_RECORDER.getInstance().write(object, out, typeResolver);
byte[] array = (byte[]) object;
out.write(array.length);
out.writeVarInt(array.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public boolean supports(Class<?> type) {

@Override
public ObjectRecord read(@NotNull Type type, BytesIn input, ByIdTypeResolver typeResolver) {
int arrayLength = input.readInt();
int recordedItemsCount = input.readInt();
int arrayLength = input.readVarInt();
int recordedItemsCount = input.readVarInt();
List<ObjectRecord> items = new ArrayList<>(recordedItemsCount);
for (int i = 0; i < recordedItemsCount; i++) {
items.add(input.readObject(typeResolver));
Expand All @@ -46,9 +46,9 @@ public ObjectRecord read(@NotNull Type type, BytesIn input, ByIdTypeResolver typ
public void write(Object object, BytesOut out, TypeResolver typeResolver) throws Exception {
Object[] array = (Object[]) object;
int length = array.length;
out.write(length);
out.writeVarInt(length);
int itemsToRecord = Math.min(3, length);
out.write(itemsToRecord);
out.writeVarInt(itemsToRecord);

for (int i = 0; i < itemsToRecord; i++) {
out.write(array[i], typeResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
public class MapRecorder extends ObjectRecorder {

public static final int MAX_ITEMS_TO_RECORD = SystemPropertyUtil.getInt("ulyp.recorder.map.items", 3);
private static final int RECORDED_ITEMS_FLAG = 1;
private static final int RECORDED_IDENTITY_ONLY = 0;
private static final byte RECORDED_ITEMS_FLAG = 1;
private static final byte RECORDED_IDENTITY_ONLY = 0;
@Setter
private volatile CollectionsRecordingMode mode = CollectionsRecordingMode.NONE;
private volatile boolean active = true;
Expand All @@ -43,11 +43,11 @@ public boolean supportsAsyncRecording() {

@Override
public ObjectRecord read(@NotNull Type type, BytesIn input, ByIdTypeResolver typeResolver) {
int recordedItems = input.readInt();
byte recordedItems = input.readByte();

if (recordedItems == RECORDED_ITEMS_FLAG) {
int collectionSize = input.readInt();
int recordedItemsCount = input.readInt();
int collectionSize = input.readVarInt();
int recordedItemsCount = input.readVarInt();
List<MapEntryRecord> entries = new ArrayList<>();
for (int i = 0; i < recordedItemsCount; i++) {
ObjectRecord key = input.readObject(typeResolver);
Expand All @@ -69,9 +69,9 @@ public void write(Object object, BytesOut nout, TypeResolver typeResolver) throw
try {
Map<?, ?> collection = (Map<?, ?>) object;
int length = collection.size();
out.write(length);
out.writeVarInt(length);
int itemsToRecord = Math.min(MAX_ITEMS_TO_RECORD, length);
out.write(itemsToRecord);
out.writeVarInt(itemsToRecord);
Iterator<? extends Map.Entry<?, ?>> iterator = collection.entrySet().iterator();
int recorded = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class RecordedEnterMethodCallSerializer {

public void serializeEnterMethodCall(BytesOut out, int callId, int methodId, TypeResolver typeResolver, Object callee, Object[] args, long nanoTime) {
out.write(ENTER_METHOD_CALL_ID);
out.write(callId);
out.write(methodId);
out.writeVarInt(callId);
out.writeVarInt(methodId);
out.write(nanoTime);
serializeArgs(out, typeResolver, args);
serializeCallee(out, typeResolver, callee);
Expand All @@ -33,7 +33,7 @@ private static void serializeCallee(BytesOut out, TypeResolver typeResolver, Obj

ObjectRecorder recorder = callee instanceof QueuedIdentityObject ? ObjectRecorderRegistry.QUEUE_IDENTITY_RECORDER.getInstance() : ObjectRecorderRegistry.IDENTITY_RECORDER.getInstance();

out.write(typeResolver.get(callee).getId());
out.writeVarInt(typeResolver.get(callee).getId());
out.write(recorder.getId());
try {
recorder.write(callee, out, typeResolver);
Expand All @@ -42,7 +42,7 @@ private static void serializeCallee(BytesOut out, TypeResolver typeResolver, Obj
}
} else {
ObjectRecorder recorder = ObjectRecorderRegistry.NULL_RECORDER.getInstance();
out.write(Type.unknown().getId());
out.writeVarInt(Type.unknown().getId());
out.write(recorder.getId());
try {
recorder.write(null, out, typeResolver);
Expand All @@ -53,7 +53,7 @@ private static void serializeCallee(BytesOut out, TypeResolver typeResolver, Obj
}

private static void serializeArgs(BytesOut out, TypeResolver typeResolver, Object[] args) {
out.write(args.length);
out.writeVarInt(args.length);
for (int argIndex = 0; argIndex < args.length; argIndex++) {
Object argValue = args[argIndex];
Type argType = typeResolver.get(argValue);
Expand All @@ -65,7 +65,7 @@ private static void serializeArgs(BytesOut out, TypeResolver typeResolver, Objec

ObjectRecorder recorder = argValue != null ? recorderHint : ObjectRecorderRegistry.NULL_RECORDER.getInstance();

out.write(argType.getId());
out.writeVarInt(argType.getId());
out.write(recorder.getId());
try {
recorder.write(argValue, out, typeResolver);
Expand All @@ -76,7 +76,7 @@ private static void serializeArgs(BytesOut out, TypeResolver typeResolver, Objec
}

private static ObjectRecord deserializeObject(BytesIn input, ReadableRepository<Integer, Type> typeResolver) {
int typeId = input.readInt();
int typeId = input.readVarInt();
byte recorderId = input.readByte();
Type type = Optional.ofNullable(typeResolver.get(typeId)).orElse(Type.unknown());
ObjectRecorder objectRecorder = ObjectRecorderRegistry.recorderForId(recorderId);
Expand All @@ -88,10 +88,10 @@ private static ObjectRecord deserializeObject(BytesIn input, ReadableRepository<
}

public static RecordedEnterMethodCall deserialize(BytesIn input, ReadableRepository<Integer, Type> typeResolver) {
int callId = input.readInt();
int methodId = input.readInt();
int callId = input.readVarInt();
int methodId = input.readVarInt();
long nanoTime = input.readLong();
int argsCount = input.readInt();
int argsCount = input.readVarInt();

List<ObjectRecord> arguments = new ArrayList<>(argsCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public class RecordedExitMethodCallSerializer {

public void serializeExitMethodCall(BytesOut out, int callId, TypeResolver typeResolver, boolean thrown, Object returnValue, long nanoTime) {
out.write(EXIT_METHOD_CALL_ID);
out.write(callId);
out.writeVarInt(callId);
out.write(thrown);
out.write(nanoTime);

Type type = typeResolver.get(returnValue);
out.write(type.getId());
out.writeVarInt(type.getId());

ObjectRecorder recorderHint = type.getRecorderHint();
if (returnValue != null && recorderHint == null) {
Expand All @@ -48,7 +48,7 @@ public void serializeExitMethodCall(BytesOut out, int callId, TypeResolver typeR
}

private static ObjectRecord deserializeObject(BytesIn input, ReadableRepository<Integer, Type> typeResolver) {
int typeId = input.readInt();
int typeId = input.readVarInt();
byte recorderId = input.readByte();
Type type = Optional.ofNullable(typeResolver.get(typeId)).orElse(Type.unknown());
ObjectRecorder objectRecorder = ObjectRecorderRegistry.recorderForId(recorderId);
Expand All @@ -61,7 +61,7 @@ private static ObjectRecord deserializeObject(BytesIn input, ReadableRepository<

public static RecordedExitMethodCall deserialize(BytesIn input, ReadableRepository<Integer, Type> typeResolver) {

int callId = input.readInt();
int callId = input.readVarInt();
boolean thrown = input.readBoolean();
long nanoTime = input.readLong();

Expand Down
Loading