Skip to content

Commit

Permalink
feat(node): NET-1252 OperatorPlugin registers RPC method (#2702)
Browse files Browse the repository at this point in the history
## Summary

OperatorPlugin now registers the RPC method that can be used to discover
which operators are in which partitions.

---------

Co-authored-by: Eric Andrews <eric.andrews@streamr.com>
  • Loading branch information
juslesan and harbu committed Aug 7, 2024
1 parent 7923070 commit 0d1215d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
20 changes: 19 additions & 1 deletion packages/node/src/plugins/operator/OperatorPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { ReviewRequestEvent, SignerWithProvider, StreamrClient } from '@streamr/sdk'
import {
OperatorDiscoveryRequest,
OperatorDiscoveryResponse,
peerDescriptorTranslator,
ReviewRequestEvent,
SignerWithProvider,
StreamrClient
} from '@streamr/sdk'
import {
EthereumAddress,
Logger,
StreamPartIDUtils,
addManagedEventListener,
scheduleAtInterval,
setAbortableInterval,
Expand Down Expand Up @@ -106,6 +114,16 @@ export class OperatorPlugin extends Plugin<OperatorPluginConfig> {
await fleetState.start()
await maintainTopologyHelper.start()

const networkNode = await streamrClient.getNode()

const rpcServerFunction = async (request: OperatorDiscoveryRequest) => {
const streamPartId = StreamPartIDUtils.parse(request.streamPartId)
const operators = streamPartAssignments.getAssignedNodesForStreamPart(streamPartId)
return OperatorDiscoveryResponse.create({ operators: operators.map((operator) => peerDescriptorTranslator(operator)) })
}

await networkNode.registerExternalRpcMethod(OperatorDiscoveryRequest, OperatorDiscoveryResponse, 'discoverOperators', rpcServerFunction)

this.abortController.signal.addEventListener('abort', async () => {
await fleetState.destroy()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import {
_operatorContractUtils
} from '@streamr/sdk'
import { fastPrivateKey, fetchPrivateKeyWithGas } from '@streamr/test-utils'
import { collect, waitForCondition } from '@streamr/utils'
import { collect, waitForCondition, StreamPartIDUtils, EthereumAddress, toEthereumAddress } from '@streamr/utils'
import { Wallet } from 'ethers'
import { Broker, createBroker } from '../../../../src/broker'
import { createClient, createTestStream, formConfig, startBroker } from '../../../utils'
import { formCoordinationStreamId } from '../../../../src/plugins/operator/formCoordinationStreamId'

const {
delegate,
Expand Down Expand Up @@ -39,6 +40,13 @@ describe('OperatorPlugin', () => {
await broker?.stop()
})

async function waitForHeartbeatMessage(operatorContractAddress: EthereumAddress): Promise<void> {
const client = createClient(fastPrivateKey())
const sub = await client.subscribe(formCoordinationStreamId(operatorContractAddress))
await collect(sub, 1)
await client?.destroy()
}

it('accepts proxy connections', async () => {
const subscriber = createClient(await fetchPrivateKeyWithGas())
const stream = await createTestStream(subscriber, module)
Expand Down Expand Up @@ -93,4 +101,32 @@ describe('OperatorPlugin', () => {
await createBroker(config)
}).rejects.toThrow('Plugin operator doesn\'t support client config value "false" in network.node.acceptProxyConnections')
})

it('accepts OperatorDiscoveryRequests', async () => {
const client = createClient(await fetchPrivateKeyWithGas())
const stream = await createTestStream(client, module)

const sponsorer = await generateWalletWithGasAndTokens()
const sponsorship1 = await deploySponsorshipContract({ streamId: stream.id, deployer: sponsorer })
await sponsor(sponsorer, await sponsorship1.getAddress(), 10000)
await delegate(operatorWallet, await operatorContract.getAddress(), 10000)
await stake(operatorContract, await sponsorship1.getAddress(), 10000)

const operatorContractAddress = await operatorContract.getAddress()
broker = await startBroker({
privateKey: brokerWallet.privateKey,
extraPlugins: {
operator: {
operatorContractAddress,
}
}
})
await waitForCondition(async () => (await broker.getStreamrClient().getSubscriptions(stream.id)).length > 0)
// Ensure that heartbeat has been sent (setting heartbeatUpdateIntervalInMs lower did not help)
await waitForHeartbeatMessage(toEthereumAddress(operatorContractAddress))
const brokerDescriptor = await broker.getStreamrClient().getPeerDescriptor()
const operators = await client.getNode().discoverOperators(brokerDescriptor, StreamPartIDUtils.parse(`${stream.id}#0`))
expect(operators[0].nodeId).toEqual(brokerDescriptor.nodeId)
}, 60 * 1000)

})
4 changes: 3 additions & 1 deletion packages/sdk/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export { GroupKey as EncryptionKey } from './encryption/GroupKey'
export { UpdateEncryptionKeyOptions } from './encryption/LocalGroupKeyStore'
export { CONFIG_TEST } from './ConfigTest'
export { StreamDefinition } from './types'
export { formStorageNodeAssignmentStreamId } from './utils/utils'
export { formStorageNodeAssignmentStreamId, peerDescriptorTranslator } from './utils/utils'
export { SignerWithProvider } from './Authentication'
export { convertBytesToStreamMessage, convertStreamMessageToBytes } from './protocol/oldStreamMessageBinaryUtils'

Expand Down Expand Up @@ -77,6 +77,8 @@ export {
StreamMessageType
} from './protocol/StreamMessage'

export { OperatorDiscoveryRequest, OperatorDiscoveryResponse } from './generated/packages/sdk/protos/SdkRpc'

// These are exported for the internal Operator class
export {
Operator,
Expand Down

0 comments on commit 0d1215d

Please sign in to comment.