From 3bff564a4ffa6f43e4496871b628ea1bfaa4c568 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 31 Mar 2020 15:16:49 -0600 Subject: [PATCH] fix: have a generic IBCChannelHandler that takes ChannelTuples This way, a given block has a fresh channel handler, and the requests specify a tuple argument, so that they can be invoked later (after the block or transaction has passed) and are stable after restarts. --- packages/cosmic-swingset/lib/block-manager.js | 81 +++++++++++++++++-- .../cosmic-swingset/x/swingset/handler.go | 41 +++++++--- packages/cosmic-swingset/x/swingset/ibc.go | 61 +++----------- 3 files changed, 113 insertions(+), 70 deletions(-) diff --git a/packages/cosmic-swingset/lib/block-manager.js b/packages/cosmic-swingset/lib/block-manager.js index d563096eb5b..a8e89ab13de 100644 --- a/packages/cosmic-swingset/lib/block-manager.js +++ b/packages/cosmic-swingset/lib/block-manager.js @@ -1,4 +1,5 @@ import anylogger from 'anylogger'; +import stableStringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify'; const log = anylogger('block-manager'); @@ -9,6 +10,13 @@ const COMMIT_BLOCK = 'COMMIT_BLOCK'; const IBC_PACKET = 'IBC_PACKET'; const IBC_TIMEOUT = 'IBC_TIMEOUT'; +// This works for both *intArray, string, and Buffer. +const getBytesToBase64 = data => Buffer.from(data).toString('base64'); + +// FIXME: use an immutable Uint8Array. +const getBase64ToBytes = data64 => + Uint8Array.from(Buffer.from(data64, 'base64')); + export default function makeBlockManager( { deliverInbound, @@ -22,6 +30,64 @@ export default function makeBlockManager( ) { let computedHeight = savedHeight; let runTime = 0; + let ibcHandlerPort = 0; + + const ibcTupleToChannel = new Map(); + const ibcChannelToPort = new Map(); + + const getIBCChannel = ({ + ibcHandlerPort: actionIbcHandlerPort, + tuple: rawTuple, + channelPort, + }) => { + // Update our handler port. + ibcHandlerPort = actionIbcHandlerPort; + + // Cache according to tuple (never changes during a connection)! + const tuple = stableStringify(rawTuple); + let ibcChannel = ibcTupleToChannel.get(tuple); + if (ibcChannel) { + ibcChannelToPort.set(ibcChannel, channelPort); + return ibcChannel; + } + + const sendToChannelHandler = msg => + sendToCosmosPort( + ibcHandlerPort, + JSON.stringify({ ...msg, tuple: rawTuple }), + ); + + ibcChannel = { + // Send a raw packet to this channel. + send(rawPacket) { + return sendToChannelHandler({ + method: 'send', + data64: getBytesToBase64(rawPacket), + }); + }, + // Ack the current packet. + ack(reply) { + return sendToChannelHandler({ + method: 'ack', + data64: getBytesToBase64(reply), + }); + }, + // Close the channel. + close() { + const ret = sendToChannelHandler({ + method: 'close', + }); + ibcChannelToPort.delete(ibcChannel); + ibcTupleToChannel.delete(tuple); + return ret; + }, + }; + + // Keep this channel associated with a tuple (which stays the same, even after restart). + ibcTupleToChannel.set(tuple, ibcChannel); + return ibcChannel; + }; + async function kernelPerformAction(action) { const start = Date.now(); const finish = _ => (runTime += Date.now() - start); @@ -43,14 +109,13 @@ export default function makeBlockManager( break; case IBC_PACKET: { - // FIXME: We just ack and disconnect. - sendToCosmosPort(action.channelPort, { - method: 'ack', - data64: Buffer.from(JSON.stringify(action)).toString('base64'), - }); - sendToCosmosPort(action.channelPort, { - method: 'close', - }); + console.error(`FIXME: Got IBC packet; just pingpong`, action); + const ibcChannel = getIBCChannel(action); + + // FIXME: We just ack, send, and disconnect. + ibcChannel.ack(JSON.stringify(action)); + ibcChannel.send(`pong:${JSON.stringify(action)}`); + ibcChannel.close(); break; } diff --git a/packages/cosmic-swingset/x/swingset/handler.go b/packages/cosmic-swingset/x/swingset/handler.go index 1fcac2209fd..7969575c5cd 100644 --- a/packages/cosmic-swingset/x/swingset/handler.go +++ b/packages/cosmic-swingset/x/swingset/handler.go @@ -16,12 +16,13 @@ import ( ) type ibcPacketAction struct { - Type string `json:"type"` - Packet channeltypes.Packet `json:"packet"` - ChannelPort int `json:"channelPort"` - StoragePort int `json:"storagePort"` - BlockHeight int64 `json:"blockHeight"` - BlockTime int64 `json:"blockTime"` + Type string `json:"type"` + Packet channeltypes.Packet `json:"packet"` + Tuple ChannelTuple `json:"tuple"` + IBCHandlerPort int `json:"ibcHandlerPort"` + StoragePort int `json:"storagePort"` + BlockHeight int64 `json:"blockHeight"` + BlockTime int64 `json:"blockTime"` } type deliverInboundAction struct { @@ -73,15 +74,29 @@ func handleIBCPacket(ctx sdk.Context, keeper Keeper, actionType string, packet c // The channel lifetime is longer than just one message. pkt := packet.(channeltypes.Packet) - channelPort := GetIBCChannelPortHandler(ctx, keeper, pkt) + + ibcHandlerPort := RegisterPortHandler(NewIBCChannelHandler(ctx, keeper, &pkt)) + defer UnregisterPortHandler(ibcHandlerPort) + + tuple := ChannelTuple{ + Source: ChannelEndpoint{ + Channel: pkt.SourceChannel, + Port: pkt.SourcePort, + }, + Destination: ChannelEndpoint{ + Channel: pkt.DestinationChannel, + Port: pkt.DestinationPort, + }, + } action := &ibcPacketAction{ - Type: actionType, - Packet: pkt, - StoragePort: storagePort, - ChannelPort: channelPort, - BlockHeight: ctx.BlockHeight(), - BlockTime: ctx.BlockTime().Unix(), + Type: actionType, + Packet: pkt, + Tuple: tuple, + StoragePort: storagePort, + IBCHandlerPort: ibcHandlerPort, + BlockHeight: ctx.BlockHeight(), + BlockTime: ctx.BlockTime().Unix(), } // fmt.Fprintf(os.Stderr, "Context is %+v\n", ctx) diff --git a/packages/cosmic-swingset/x/swingset/ibc.go b/packages/cosmic-swingset/x/swingset/ibc.go index 8b511858237..809668e9749 100644 --- a/packages/cosmic-swingset/x/swingset/ibc.go +++ b/packages/cosmic-swingset/x/swingset/ibc.go @@ -12,27 +12,25 @@ import ( // FIXME: How to tell the caller when this is a new channel? -type channelEndpoint struct { +type ChannelEndpoint struct { Port string `json:"port"` Channel string `json:"channel"` } -type channelTuple struct { - Destination channelEndpoint - Source channelEndpoint +type ChannelTuple struct { + Destination ChannelEndpoint `json:"dst"` + Source ChannelEndpoint `json:"src"` } type channelHandler struct { Keeper Keeper Context sdk.Context - Destination channelEndpoint CurrentPacket *channeltypes.Packet - ChannelPort int - Tuple channelTuple } type channelMessage struct { Method string `json:"method"` + Tuple ChannelTuple `json:"tuple"` Data64 string `json:"data64"` } @@ -45,42 +43,11 @@ func (cm channelMessage) GetData() []byte { return data } -var channelHandlers map[channelTuple]*channelHandler - -func init() { - channelHandlers = make(map[channelTuple]*channelHandler) -} - -func GetIBCChannelPortHandler(ctx sdk.Context, keeper Keeper, packet channeltypes.Packet) int { - tuple := channelTuple { - Destination: channelEndpoint{ - Port: packet.DestinationPort, - Channel: packet.DestinationChannel, - }, - Source: channelEndpoint{ - Port: packet.SourcePort, - Channel: packet.SourceChannel, - }, - }; - - // lookup existing channel based on the connection tuple - if ch, ok := channelHandlers[tuple]; ok { - ch.CurrentPacket = &packet - return ch.ChannelPort - } - - ch := NewChannelHandler(ctx, keeper, tuple) - ch.CurrentPacket = &packet - ch.ChannelPort = RegisterPortHandler(ch) - channelHandlers[tuple] = ch - return ch.ChannelPort -} - -func NewChannelHandler(ctx sdk.Context, keeper Keeper, tuple channelTuple) *channelHandler { +func NewIBCChannelHandler(ctx sdk.Context, keeper Keeper, packet *channeltypes.Packet) *channelHandler { return &channelHandler{ Context: ctx, Keeper: keeper, - Tuple: tuple, + CurrentPacket: packet, } } @@ -107,11 +74,7 @@ func (ch *channelHandler) Receive(str string) (ret string, err error) { case "close": // Make sure our port goes away. - defer func() { - UnregisterPortHandler(ch.ChannelPort) - delete(channelHandlers, ch.Tuple) - }() - if err = ch.Keeper.ChanCloseInit(ch.Context, ch.Tuple.Destination.Port, ch.Tuple.Destination.Channel); err != nil { + if err = ch.Keeper.ChanCloseInit(ch.Context, msg.Tuple.Destination.Port, msg.Tuple.Destination.Channel); err != nil { return "", err } return "true", nil @@ -119,8 +82,8 @@ func (ch *channelHandler) Receive(str string) (ret string, err error) { case "send": seq, ok := ch.Keeper.GetNextSequenceSend( ch.Context, - ch.Tuple.Destination.Channel, - ch.Tuple.Destination.Port, + msg.Tuple.Destination.Channel, + msg.Tuple.Destination.Port, ) if !ok { return "", fmt.Errorf("unknown sequence number") @@ -131,8 +94,8 @@ func (ch *channelHandler) Receive(str string) (ret string, err error) { packet := channeltypes.NewPacket( msg.GetData(), seq, - ch.Tuple.Source.Port, ch.Tuple.Source.Channel, - ch.Tuple.Destination.Port, ch.Tuple.Destination.Channel, + msg.Tuple.Source.Port, msg.Tuple.Source.Channel, + msg.Tuple.Destination.Port, msg.Tuple.Destination.Channel, uint64(ch.Context.BlockHeight() + blockTimeout), ) if err := ch.Keeper.SendPacket(ch.Context, packet); err != nil{