Skip to content

Commit

Permalink
fix: have a generic IBCChannelHandler that takes ChannelTuples
Browse files Browse the repository at this point in the history
This way, a given block has a fresh channel handler, and the requests
specify a tuple argument, so that they can be invoked later
(after the block or transaction has passed) and are stable after
restarts.
  • Loading branch information
michaelfig committed Apr 1, 2020
1 parent 88257f8 commit 3bff564
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 70 deletions.
81 changes: 73 additions & 8 deletions packages/cosmic-swingset/lib/block-manager.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import anylogger from 'anylogger';
import stableStringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify';

const log = anylogger('block-manager');

Expand All @@ -9,6 +10,13 @@ const COMMIT_BLOCK = 'COMMIT_BLOCK';
const IBC_PACKET = 'IBC_PACKET';
const IBC_TIMEOUT = 'IBC_TIMEOUT';

// This works for both *intArray, string, and Buffer.
const getBytesToBase64 = data => Buffer.from(data).toString('base64');

// FIXME: use an immutable Uint8Array.
const getBase64ToBytes = data64 =>
Uint8Array.from(Buffer.from(data64, 'base64'));

export default function makeBlockManager(
{
deliverInbound,
Expand All @@ -22,6 +30,64 @@ export default function makeBlockManager(
) {
let computedHeight = savedHeight;
let runTime = 0;
let ibcHandlerPort = 0;

const ibcTupleToChannel = new Map();
const ibcChannelToPort = new Map();

const getIBCChannel = ({
ibcHandlerPort: actionIbcHandlerPort,
tuple: rawTuple,
channelPort,
}) => {
// Update our handler port.
ibcHandlerPort = actionIbcHandlerPort;

// Cache according to tuple (never changes during a connection)!
const tuple = stableStringify(rawTuple);
let ibcChannel = ibcTupleToChannel.get(tuple);
if (ibcChannel) {
ibcChannelToPort.set(ibcChannel, channelPort);
return ibcChannel;
}

const sendToChannelHandler = msg =>
sendToCosmosPort(
ibcHandlerPort,
JSON.stringify({ ...msg, tuple: rawTuple }),
);

ibcChannel = {
// Send a raw packet to this channel.
send(rawPacket) {
return sendToChannelHandler({
method: 'send',
data64: getBytesToBase64(rawPacket),
});
},
// Ack the current packet.
ack(reply) {
return sendToChannelHandler({
method: 'ack',
data64: getBytesToBase64(reply),
});
},
// Close the channel.
close() {
const ret = sendToChannelHandler({
method: 'close',
});
ibcChannelToPort.delete(ibcChannel);
ibcTupleToChannel.delete(tuple);
return ret;
},
};

// Keep this channel associated with a tuple (which stays the same, even after restart).
ibcTupleToChannel.set(tuple, ibcChannel);
return ibcChannel;
};

async function kernelPerformAction(action) {
const start = Date.now();
const finish = _ => (runTime += Date.now() - start);
Expand All @@ -43,14 +109,13 @@ export default function makeBlockManager(
break;

case IBC_PACKET: {
// FIXME: We just ack and disconnect.
sendToCosmosPort(action.channelPort, {
method: 'ack',
data64: Buffer.from(JSON.stringify(action)).toString('base64'),
});
sendToCosmosPort(action.channelPort, {
method: 'close',
});
console.error(`FIXME: Got IBC packet; just pingpong`, action);
const ibcChannel = getIBCChannel(action);

// FIXME: We just ack, send, and disconnect.
ibcChannel.ack(JSON.stringify(action));
ibcChannel.send(`pong:${JSON.stringify(action)}`);
ibcChannel.close();
break;
}

Expand Down
41 changes: 28 additions & 13 deletions packages/cosmic-swingset/x/swingset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
)

type ibcPacketAction struct {
Type string `json:"type"`
Packet channeltypes.Packet `json:"packet"`
ChannelPort int `json:"channelPort"`
StoragePort int `json:"storagePort"`
BlockHeight int64 `json:"blockHeight"`
BlockTime int64 `json:"blockTime"`
Type string `json:"type"`
Packet channeltypes.Packet `json:"packet"`
Tuple ChannelTuple `json:"tuple"`
IBCHandlerPort int `json:"ibcHandlerPort"`
StoragePort int `json:"storagePort"`
BlockHeight int64 `json:"blockHeight"`
BlockTime int64 `json:"blockTime"`
}

type deliverInboundAction struct {
Expand Down Expand Up @@ -73,15 +74,29 @@ func handleIBCPacket(ctx sdk.Context, keeper Keeper, actionType string, packet c

// The channel lifetime is longer than just one message.
pkt := packet.(channeltypes.Packet)
channelPort := GetIBCChannelPortHandler(ctx, keeper, pkt)

ibcHandlerPort := RegisterPortHandler(NewIBCChannelHandler(ctx, keeper, &pkt))
defer UnregisterPortHandler(ibcHandlerPort)

tuple := ChannelTuple{
Source: ChannelEndpoint{
Channel: pkt.SourceChannel,
Port: pkt.SourcePort,
},
Destination: ChannelEndpoint{
Channel: pkt.DestinationChannel,
Port: pkt.DestinationPort,
},
}

action := &ibcPacketAction{
Type: actionType,
Packet: pkt,
StoragePort: storagePort,
ChannelPort: channelPort,
BlockHeight: ctx.BlockHeight(),
BlockTime: ctx.BlockTime().Unix(),
Type: actionType,
Packet: pkt,
Tuple: tuple,
StoragePort: storagePort,
IBCHandlerPort: ibcHandlerPort,
BlockHeight: ctx.BlockHeight(),
BlockTime: ctx.BlockTime().Unix(),
}

// fmt.Fprintf(os.Stderr, "Context is %+v\n", ctx)
Expand Down
61 changes: 12 additions & 49 deletions packages/cosmic-swingset/x/swingset/ibc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,25 @@ import (

// FIXME: How to tell the caller when this is a new channel?

type channelEndpoint struct {
type ChannelEndpoint struct {
Port string `json:"port"`
Channel string `json:"channel"`
}

type channelTuple struct {
Destination channelEndpoint
Source channelEndpoint
type ChannelTuple struct {
Destination ChannelEndpoint `json:"dst"`
Source ChannelEndpoint `json:"src"`
}

type channelHandler struct {
Keeper Keeper
Context sdk.Context
Destination channelEndpoint
CurrentPacket *channeltypes.Packet
ChannelPort int
Tuple channelTuple
}

type channelMessage struct {
Method string `json:"method"`
Tuple ChannelTuple `json:"tuple"`
Data64 string `json:"data64"`
}

Expand All @@ -45,42 +43,11 @@ func (cm channelMessage) GetData() []byte {
return data
}

var channelHandlers map[channelTuple]*channelHandler

func init() {
channelHandlers = make(map[channelTuple]*channelHandler)
}

func GetIBCChannelPortHandler(ctx sdk.Context, keeper Keeper, packet channeltypes.Packet) int {
tuple := channelTuple {
Destination: channelEndpoint{
Port: packet.DestinationPort,
Channel: packet.DestinationChannel,
},
Source: channelEndpoint{
Port: packet.SourcePort,
Channel: packet.SourceChannel,
},
};

// lookup existing channel based on the connection tuple
if ch, ok := channelHandlers[tuple]; ok {
ch.CurrentPacket = &packet
return ch.ChannelPort
}

ch := NewChannelHandler(ctx, keeper, tuple)
ch.CurrentPacket = &packet
ch.ChannelPort = RegisterPortHandler(ch)
channelHandlers[tuple] = ch
return ch.ChannelPort
}

func NewChannelHandler(ctx sdk.Context, keeper Keeper, tuple channelTuple) *channelHandler {
func NewIBCChannelHandler(ctx sdk.Context, keeper Keeper, packet *channeltypes.Packet) *channelHandler {
return &channelHandler{
Context: ctx,
Keeper: keeper,
Tuple: tuple,
CurrentPacket: packet,
}
}

Expand All @@ -107,20 +74,16 @@ func (ch *channelHandler) Receive(str string) (ret string, err error) {

case "close":
// Make sure our port goes away.
defer func() {
UnregisterPortHandler(ch.ChannelPort)
delete(channelHandlers, ch.Tuple)
}()
if err = ch.Keeper.ChanCloseInit(ch.Context, ch.Tuple.Destination.Port, ch.Tuple.Destination.Channel); err != nil {
if err = ch.Keeper.ChanCloseInit(ch.Context, msg.Tuple.Destination.Port, msg.Tuple.Destination.Channel); err != nil {
return "", err
}
return "true", nil

case "send":
seq, ok := ch.Keeper.GetNextSequenceSend(
ch.Context,
ch.Tuple.Destination.Channel,
ch.Tuple.Destination.Port,
msg.Tuple.Destination.Channel,
msg.Tuple.Destination.Port,
)
if !ok {
return "", fmt.Errorf("unknown sequence number")
Expand All @@ -131,8 +94,8 @@ func (ch *channelHandler) Receive(str string) (ret string, err error) {

packet := channeltypes.NewPacket(
msg.GetData(), seq,
ch.Tuple.Source.Port, ch.Tuple.Source.Channel,
ch.Tuple.Destination.Port, ch.Tuple.Destination.Channel,
msg.Tuple.Source.Port, msg.Tuple.Source.Channel,
msg.Tuple.Destination.Port, msg.Tuple.Destination.Channel,
uint64(ch.Context.BlockHeight() + blockTimeout),
)
if err := ch.Keeper.SendPacket(ch.Context, packet); err != nil{
Expand Down

0 comments on commit 3bff564

Please sign in to comment.