Skip to content

Commit

Permalink
Akka.Cluster: improve gossip serialization performance (#7281)
Browse files Browse the repository at this point in the history
* Akka.Cluster: improve gossip serialization performance

* inline creation of addr and role indices

* fixed role mapping

* inlined mapping of `Proto.Msg.Member`

* inline serialization of addresses

* remove `allAddresses`

* cleaned up some gossip deserialization calls
  • Loading branch information
Aaronontheweb committed Jul 17, 2024
1 parent 9981d83 commit 5176dfb
Showing 1 changed file with 76 additions and 35 deletions.
111 changes: 76 additions & 35 deletions src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,72 +323,113 @@ private static ClusterRouterPoolSettings ClusterRouterPoolSettingsFrom(Proto.Msg

private static Proto.Msg.Gossip GossipToProto(Gossip gossip)
{
var allMembers = gossip.Members.ToList();
var allAddresses = gossip.Members.Select(x => x.UniqueAddress).ToList();
var addressMapping = allAddresses.ZipWithIndex();
var allRoles = allMembers.Aggregate(ImmutableHashSet.Create<string>(), (set, member) => set.Union(member.Roles));
var roleMapping = allRoles.ZipWithIndex();
var allHashes = gossip.Version.Versions.Keys.Select(x => x.ToString()).ToList();
var hashMapping = allHashes.ZipWithIndex();
var allAppVersions = allMembers.Select(i => i.AppVersion.Version).ToImmutableHashSet();
var appVersionMapping = allAppVersions.ZipWithIndex();

int MapUniqueAddress(UniqueAddress address) => MapWithErrorMessage(addressMapping, address, "address");

int MapAppVersion(AppVersion appVersion) => MapWithErrorMessage(appVersionMapping, appVersion.Version, "appVersion");

Proto.Msg.Member MemberToProto(Member m)
var allMembers = gossip.Members;

// rather than call a bunch of individual LINQ operations, we're going to do it all in one go
var allRoles = new HashSet<string>();
var addressesToProto = new List<Proto.Msg.UniqueAddress>(gossip.Members.Count);
var allAppVersions = new HashSet<string>();
var addressMapping = new Dictionary<UniqueAddress, int>();
var addrIndex = 0;
var roleMapping = new Dictionary<string, int>();
var roleIndex = 0;
var membersProtos = new List<Proto.Msg.Member>(gossip.Members.Count);
var appVersionMapping = new Dictionary<string, int>();
var appVersionIndex = 0;

foreach (var m in allMembers)
{
var protoMember = new Proto.Msg.Member();
protoMember.AddressIndex = MapUniqueAddress(m.UniqueAddress);
protoMember.UpNumber = m.UpNumber;
protoMember.Status = (Proto.Msg.Member.Types.MemberStatus)m.Status;
protoMember.RolesIndexes.AddRange(m.Roles.Select(s => MapWithErrorMessage(roleMapping, s, "role")));
protoMember.AppVersionIndex = MapAppVersion(m.AppVersion);
return protoMember;
if (!addressMapping.ContainsKey(m.UniqueAddress))
{
addressMapping.Add(m.UniqueAddress, addrIndex);
addrIndex += 1;
}
addressesToProto.Add(UniqueAddressToProto(m.UniqueAddress));
var previousRoleCount = allRoles.Count;
allRoles.UnionWith(m.Roles);
if (allRoles.Count > previousRoleCount) // found a new role
{
foreach(var role in m.Roles)
{
// TODO: TryAdd would be nice here
if (roleMapping.ContainsKey(role)) continue;
roleMapping.Add(role, roleIndex);
roleIndex += 1;
}
}

allAppVersions.Add(m.AppVersion.Version);
if (!appVersionMapping.ContainsKey(m.AppVersion.Version))
{
appVersionMapping.Add(m.AppVersion.Version, appVersionIndex);
appVersionIndex += 1;
}


membersProtos.Add(MemberToProto(m));
}

//var addressMapping = allAddresses.ZipWithIndex();
//var roleMapping = allRoles.ZipWithIndex();
var allHashes = gossip.Version.Versions.Keys.Select(x => x.ToString()).ToArray();
var hashMapping = allHashes.ZipWithIndex();

var reachabilityProto = ReachabilityToProto(gossip.Overview.Reachability, addressMapping);
var membersProtos = gossip.Members.Select((Func<Member, Proto.Msg.Member>)MemberToProto);
//var membersProtos = gossip.Members.Select(c => MemberToProto(c));
var seenProtos = gossip.Overview.Seen.Select((Func<UniqueAddress, int>)MapUniqueAddress);

var overview = new Proto.Msg.GossipOverview();
overview.Seen.AddRange(seenProtos);
overview.ObserverReachability.AddRange(reachabilityProto);

var message = new Proto.Msg.Gossip();
message.AllAddresses.AddRange(allAddresses.Select(UniqueAddressToProto));
message.AllAddresses.AddRange(addressesToProto);
message.AllRoles.AddRange(allRoles);
message.AllHashes.AddRange(allHashes);
message.Members.AddRange(membersProtos);
message.Overview = overview;
message.Version = VectorClockToProto(gossip.Version, hashMapping);
message.AllAppVersions.AddRange(allAppVersions);
return message;

int MapAppVersion(AppVersion appVersion) => MapWithErrorMessage(appVersionMapping, appVersion.Version, "appVersion");

int MapUniqueAddress(UniqueAddress address) => MapWithErrorMessage(addressMapping, address, "address");

Proto.Msg.Member MemberToProto(Member m)
{
var protoMember = new Proto.Msg.Member();
protoMember.AddressIndex = MapUniqueAddress(m.UniqueAddress);
protoMember.UpNumber = m.UpNumber;
protoMember.Status = (Proto.Msg.Member.Types.MemberStatus)m.Status;
protoMember.RolesIndexes.AddRange(m.Roles.Select(s => MapWithErrorMessage(roleMapping, s, "role")));
protoMember.AppVersionIndex = MapAppVersion(m.AppVersion);
return protoMember;
}
}

private static Gossip GossipFrom(Proto.Msg.Gossip gossip)
{
var addressMapping = gossip.AllAddresses.Select(UniqueAddressFrom).ToList();
var roleMapping = gossip.AllRoles.ToList();
var hashMapping = gossip.AllHashes.ToList();
var roleMapping = gossip.AllRoles;
var hashMapping = gossip.AllHashes;
var appVersionMapping = gossip.AllAppVersions.Select(i => AppVersion.Create(i)).ToList();

var members = gossip.Members.Select((Func<Proto.Msg.Member, Member>)MemberFromProto).ToImmutableSortedSet(Member.Ordering);
var reachability = ReachabilityFromProto(gossip.Overview.ObserverReachability, addressMapping);
var seen = gossip.Overview.Seen.Select(x => addressMapping[x]).ToImmutableHashSet();
var overview = new GossipOverview(seen, reachability);

return new Gossip(members, overview, VectorClockFrom(gossip.Version, hashMapping));

Member MemberFromProto(Proto.Msg.Member member) =>
Member.Create(
addressMapping[member.AddressIndex],
member.UpNumber,
(MemberStatus)member.Status,
member.RolesIndexes.Select(x => roleMapping[x]).ToImmutableHashSet(),
appVersionMapping.Any() ? appVersionMapping[member.AppVersionIndex] : AppVersion.Zero
);

var members = gossip.Members.Select((Func<Proto.Msg.Member, Member>)MemberFromProto).ToImmutableSortedSet(Member.Ordering);
var reachability = ReachabilityFromProto(gossip.Overview.ObserverReachability, addressMapping);
var seen = gossip.Overview.Seen.Select(x => addressMapping[x]).ToImmutableHashSet();
var overview = new GossipOverview(seen, reachability);

return new Gossip(members, overview, VectorClockFrom(gossip.Version, hashMapping));
);
}

private static IEnumerable<Proto.Msg.ObserverReachability> ReachabilityToProto(Reachability reachability, Dictionary<UniqueAddress, int> addressMapping)
Expand Down Expand Up @@ -459,7 +500,7 @@ private static VectorClock VectorClockFrom(Proto.Msg.VectorClock version, IList<

private static int MapWithErrorMessage<T>(Dictionary<T, int> map, T value, string unknown)
{
if (map.TryGetValue(value, out int mapIndex))
if (map.TryGetValue(value, out var mapIndex))
return mapIndex;

throw new ArgumentException($"Unknown {unknown} [{value}] in cluster message");
Expand Down

0 comments on commit 5176dfb

Please sign in to comment.