diff --git a/Cargo.lock b/Cargo.lock index 8674b888b0e4..d1d774ee6437 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,6 +569,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-ec2" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77e41e0567b874c884661a1eb777f006679464110e6f95c7bafafe0fb607e10" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "http", + "tokio-stream", + "tower", +] + [[package]] name = "aws-sdk-kinesis" version = "0.21.0" @@ -3620,7 +3644,7 @@ dependencies = [ "http", "madsim", "serde", - "serde_with 2.2.0", + "serde_with 2.3.1", "spin 0.9.5", "thiserror", "tokio", @@ -3644,8 +3668,7 @@ dependencies = [ [[package]] name = "madsim-rdkafka" version = "0.2.14-alpha" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "945034b3d7c612f5bed8a34dcefd9278801bab180470e92d4b2297ddb3023cc8" +source = "git+https://github.com/madsim-rs/madsim.git?rev=43e025d#43e025db997df923cf6b891cfb874fe6dabba994" dependencies = [ "async-channel", "async-trait", @@ -5435,8 +5458,7 @@ dependencies = [ [[package]] name = "rdkafka-sys" version = "4.3.0+1.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4" +source = "git+https://github.com/MaterializeInc/rust-rdkafka?rev=8ea07c4#8ea07c4d2b96636ff093e670bc921892aee0d56a" dependencies = [ "cmake", "libc", @@ -5650,7 +5672,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with 2.2.0", + "serde_with 2.3.1", "serde_yaml", "tempfile", "workspace-hack", @@ -6000,6 +6022,7 @@ dependencies = [ "apache-avro", "async-trait", "aws-config", + "aws-sdk-ec2", "aws-sdk-kinesis", "aws-sdk-s3", "aws-smithy-http", @@ -6043,7 +6066,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "serde_with 2.2.0", + "serde_with 2.3.1", "simd-json", "tempfile", "thiserror", @@ -6254,6 +6277,8 @@ dependencies = [ "arc-swap", "assert_matches", "async-trait", + "aws-config", + "aws-sdk-ec2", "axum", "bytes", "clap 4.1.8", @@ -6364,7 +6389,7 @@ dependencies = [ "risingwave_frontend", "risingwave_sqlparser", "serde", - "serde_with 2.2.0", + "serde_with 2.3.1", "serde_yaml", "tempfile", "walkdir", @@ -6508,7 +6533,7 @@ dependencies = [ "madsim-tokio", "risingwave_sqlparser", "serde", - "serde_with 2.2.0", + "serde_with 2.3.1", "serde_yaml", "walkdir", "workspace-hack", @@ -6550,7 +6575,7 @@ dependencies = [ "regex", "risingwave_rt", "serde", - "serde_with 2.2.0", + "serde_with 2.3.1", "tokio-postgres", "tokio-stream", "toml 0.4.10", @@ -7070,9 +7095,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "2.2.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d904179146de381af4c93d3af6ca4984b3152db687dacb9c3c35e86f39809c" +checksum = "85456ffac572dc8826334164f2fb6fb40a7c766aebe195a2a21ee69ee2885ecf" dependencies = [ "base64 0.13.1", "chrono", @@ -7080,7 +7105,7 @@ dependencies = [ "indexmap", "serde", "serde_json", - "serde_with_macros 2.2.0", + "serde_with_macros 2.3.1", "time 0.3.17", ] @@ -7098,9 +7123,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "2.2.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1966009f3c05f095697c537312f5415d1e3ed31ce0a56942bac4c771c5c335e" +checksum = "7cbcd6104f8a4ab6af7f6be2a0da6be86b9de3c401f6e86bb856ab2af739232f" dependencies = [ "darling 0.14.3", "proc-macro2", @@ -8849,6 +8874,8 @@ dependencies = [ "ring", "scopeguard", "serde", + "serde_json", + "serde_with 2.3.1", "smallvec", "socket2", "strum", diff --git a/Cargo.toml b/Cargo.toml index 83dfb72af155..1be6dbbbc288 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ repository = "https://github.com/risingwavelabs/risingwave" aws-config = { version = "0.51", default-features = false, features = ["rt-tokio", "native-tls"] } aws-sdk-kinesis = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] } aws-sdk-s3 = { version = "0.21", default-features = false, features = ["rt-tokio","native-tls"] } +aws-sdk-ec2 = { version = "0.21", default-features = false, features = ["rt-tokio","native-tls"] } aws-sdk-sqs = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] } aws-smithy-http = "0.51" aws-smithy-types = "0.51" @@ -122,3 +123,4 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } +madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "43e025d" } diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index bcd7c1f140ab..7ac3e72f2081 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -181,6 +181,23 @@ export interface Sink_PropertiesEntry { value: string; } +export interface Connection { + id: number; + name: string; + info?: { $case: "privateLinkService"; privateLinkService: Connection_PrivateLinkService }; +} + +export interface Connection_PrivateLinkService { + provider: string; + endpointId: string; + dnsEntries: { [key: string]: string }; +} + +export interface Connection_PrivateLinkService_DnsEntriesEntry { + key: string; + value: string; +} + export interface Index { id: number; schemaId: number; @@ -751,6 +768,128 @@ export const Sink_PropertiesEntry = { }, }; +function createBaseConnection(): Connection { + return { id: 0, name: "", info: undefined }; +} + +export const Connection = { + fromJSON(object: any): Connection { + return { + id: isSet(object.id) ? Number(object.id) : 0, + name: isSet(object.name) ? String(object.name) : "", + info: isSet(object.privateLinkService) + ? { + $case: "privateLinkService", + privateLinkService: Connection_PrivateLinkService.fromJSON(object.privateLinkService), + } + : undefined, + }; + }, + + toJSON(message: Connection): unknown { + const obj: any = {}; + message.id !== undefined && (obj.id = Math.round(message.id)); + message.name !== undefined && (obj.name = message.name); + message.info?.$case === "privateLinkService" && (obj.privateLinkService = message.info?.privateLinkService + ? Connection_PrivateLinkService.toJSON(message.info?.privateLinkService) + : undefined); + return obj; + }, + + fromPartial, I>>(object: I): Connection { + const message = createBaseConnection(); + message.id = object.id ?? 0; + message.name = object.name ?? ""; + if ( + object.info?.$case === "privateLinkService" && + object.info?.privateLinkService !== undefined && + object.info?.privateLinkService !== null + ) { + message.info = { + $case: "privateLinkService", + privateLinkService: Connection_PrivateLinkService.fromPartial(object.info.privateLinkService), + }; + } + return message; + }, +}; + +function createBaseConnection_PrivateLinkService(): Connection_PrivateLinkService { + return { provider: "", endpointId: "", dnsEntries: {} }; +} + +export const Connection_PrivateLinkService = { + fromJSON(object: any): Connection_PrivateLinkService { + return { + provider: isSet(object.provider) ? String(object.provider) : "", + endpointId: isSet(object.endpointId) ? String(object.endpointId) : "", + dnsEntries: isObject(object.dnsEntries) + ? Object.entries(object.dnsEntries).reduce<{ [key: string]: string }>((acc, [key, value]) => { + acc[key] = String(value); + return acc; + }, {}) + : {}, + }; + }, + + toJSON(message: Connection_PrivateLinkService): unknown { + const obj: any = {}; + message.provider !== undefined && (obj.provider = message.provider); + message.endpointId !== undefined && (obj.endpointId = message.endpointId); + obj.dnsEntries = {}; + if (message.dnsEntries) { + Object.entries(message.dnsEntries).forEach(([k, v]) => { + obj.dnsEntries[k] = v; + }); + } + return obj; + }, + + fromPartial, I>>( + object: I, + ): Connection_PrivateLinkService { + const message = createBaseConnection_PrivateLinkService(); + message.provider = object.provider ?? ""; + message.endpointId = object.endpointId ?? ""; + message.dnsEntries = Object.entries(object.dnsEntries ?? {}).reduce<{ [key: string]: string }>( + (acc, [key, value]) => { + if (value !== undefined) { + acc[key] = String(value); + } + return acc; + }, + {}, + ); + return message; + }, +}; + +function createBaseConnection_PrivateLinkService_DnsEntriesEntry(): Connection_PrivateLinkService_DnsEntriesEntry { + return { key: "", value: "" }; +} + +export const Connection_PrivateLinkService_DnsEntriesEntry = { + fromJSON(object: any): Connection_PrivateLinkService_DnsEntriesEntry { + return { key: isSet(object.key) ? String(object.key) : "", value: isSet(object.value) ? String(object.value) : "" }; + }, + + toJSON(message: Connection_PrivateLinkService_DnsEntriesEntry): unknown { + const obj: any = {}; + message.key !== undefined && (obj.key = message.key); + message.value !== undefined && (obj.value = message.value); + return obj; + }, + + fromPartial, I>>( + object: I, + ): Connection_PrivateLinkService_DnsEntriesEntry { + const message = createBaseConnection_PrivateLinkService_DnsEntriesEntry(); + message.key = object.key ?? ""; + message.value = object.value ?? ""; + return message; + }, +}; + function createBaseIndex(): Index { return { id: 0, diff --git a/dashboard/proto/gen/ddl_service.ts b/dashboard/proto/gen/ddl_service.ts index 9f55ed090edf..56ba0f8aefac 100644 --- a/dashboard/proto/gen/ddl_service.ts +++ b/dashboard/proto/gen/ddl_service.ts @@ -1,5 +1,5 @@ /* eslint-disable */ -import { ColIndexMapping, Database, Function, Index, Schema, Sink, Source, Table, View } from "./catalog"; +import { ColIndexMapping, Connection, Database, Function, Index, Schema, Sink, Source, Table, View } from "./catalog"; import { Status } from "./common"; import { StreamFragmentGraph } from "./stream_plan"; @@ -243,6 +243,36 @@ export interface GetDdlProgressResponse { ddlProgress: DdlProgress[]; } +export interface CreateConnectionRequest { + payload?: { $case: "privateLink"; privateLink: CreateConnectionRequest_PrivateLink }; +} + +export interface CreateConnectionRequest_PrivateLink { + provider: string; + serviceName: string; + availabilityZones: string[]; +} + +export interface CreateConnectionResponse { + connectionId: number; + /** global catalog version */ + version: number; +} + +export interface ListConnectionsRequest { +} + +export interface ListConnectionsResponse { + connections: Connection[]; +} + +export interface DropConnectionRequest { + connectionName: string; +} + +export interface DropConnectionResponse { +} + function createBaseCreateDatabaseRequest(): CreateDatabaseRequest { return { db: undefined }; } @@ -1499,6 +1529,198 @@ export const GetDdlProgressResponse = { }, }; +function createBaseCreateConnectionRequest(): CreateConnectionRequest { + return { payload: undefined }; +} + +export const CreateConnectionRequest = { + fromJSON(object: any): CreateConnectionRequest { + return { + payload: isSet(object.privateLink) + ? { $case: "privateLink", privateLink: CreateConnectionRequest_PrivateLink.fromJSON(object.privateLink) } + : undefined, + }; + }, + + toJSON(message: CreateConnectionRequest): unknown { + const obj: any = {}; + message.payload?.$case === "privateLink" && (obj.privateLink = message.payload?.privateLink + ? CreateConnectionRequest_PrivateLink.toJSON(message.payload?.privateLink) + : undefined); + return obj; + }, + + fromPartial, I>>(object: I): CreateConnectionRequest { + const message = createBaseCreateConnectionRequest(); + if ( + object.payload?.$case === "privateLink" && + object.payload?.privateLink !== undefined && + object.payload?.privateLink !== null + ) { + message.payload = { + $case: "privateLink", + privateLink: CreateConnectionRequest_PrivateLink.fromPartial(object.payload.privateLink), + }; + } + return message; + }, +}; + +function createBaseCreateConnectionRequest_PrivateLink(): CreateConnectionRequest_PrivateLink { + return { provider: "", serviceName: "", availabilityZones: [] }; +} + +export const CreateConnectionRequest_PrivateLink = { + fromJSON(object: any): CreateConnectionRequest_PrivateLink { + return { + provider: isSet(object.provider) ? String(object.provider) : "", + serviceName: isSet(object.serviceName) ? String(object.serviceName) : "", + availabilityZones: Array.isArray(object?.availabilityZones) + ? object.availabilityZones.map((e: any) => String(e)) + : [], + }; + }, + + toJSON(message: CreateConnectionRequest_PrivateLink): unknown { + const obj: any = {}; + message.provider !== undefined && (obj.provider = message.provider); + message.serviceName !== undefined && (obj.serviceName = message.serviceName); + if (message.availabilityZones) { + obj.availabilityZones = message.availabilityZones.map((e) => e); + } else { + obj.availabilityZones = []; + } + return obj; + }, + + fromPartial, I>>( + object: I, + ): CreateConnectionRequest_PrivateLink { + const message = createBaseCreateConnectionRequest_PrivateLink(); + message.provider = object.provider ?? ""; + message.serviceName = object.serviceName ?? ""; + message.availabilityZones = object.availabilityZones?.map((e) => e) || []; + return message; + }, +}; + +function createBaseCreateConnectionResponse(): CreateConnectionResponse { + return { connectionId: 0, version: 0 }; +} + +export const CreateConnectionResponse = { + fromJSON(object: any): CreateConnectionResponse { + return { + connectionId: isSet(object.connectionId) ? Number(object.connectionId) : 0, + version: isSet(object.version) ? Number(object.version) : 0, + }; + }, + + toJSON(message: CreateConnectionResponse): unknown { + const obj: any = {}; + message.connectionId !== undefined && (obj.connectionId = Math.round(message.connectionId)); + message.version !== undefined && (obj.version = Math.round(message.version)); + return obj; + }, + + fromPartial, I>>(object: I): CreateConnectionResponse { + const message = createBaseCreateConnectionResponse(); + message.connectionId = object.connectionId ?? 0; + message.version = object.version ?? 0; + return message; + }, +}; + +function createBaseListConnectionsRequest(): ListConnectionsRequest { + return {}; +} + +export const ListConnectionsRequest = { + fromJSON(_: any): ListConnectionsRequest { + return {}; + }, + + toJSON(_: ListConnectionsRequest): unknown { + const obj: any = {}; + return obj; + }, + + fromPartial, I>>(_: I): ListConnectionsRequest { + const message = createBaseListConnectionsRequest(); + return message; + }, +}; + +function createBaseListConnectionsResponse(): ListConnectionsResponse { + return { connections: [] }; +} + +export const ListConnectionsResponse = { + fromJSON(object: any): ListConnectionsResponse { + return { + connections: Array.isArray(object?.connections) ? object.connections.map((e: any) => Connection.fromJSON(e)) : [], + }; + }, + + toJSON(message: ListConnectionsResponse): unknown { + const obj: any = {}; + if (message.connections) { + obj.connections = message.connections.map((e) => e ? Connection.toJSON(e) : undefined); + } else { + obj.connections = []; + } + return obj; + }, + + fromPartial, I>>(object: I): ListConnectionsResponse { + const message = createBaseListConnectionsResponse(); + message.connections = object.connections?.map((e) => Connection.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseDropConnectionRequest(): DropConnectionRequest { + return { connectionName: "" }; +} + +export const DropConnectionRequest = { + fromJSON(object: any): DropConnectionRequest { + return { connectionName: isSet(object.connectionName) ? String(object.connectionName) : "" }; + }, + + toJSON(message: DropConnectionRequest): unknown { + const obj: any = {}; + message.connectionName !== undefined && (obj.connectionName = message.connectionName); + return obj; + }, + + fromPartial, I>>(object: I): DropConnectionRequest { + const message = createBaseDropConnectionRequest(); + message.connectionName = object.connectionName ?? ""; + return message; + }, +}; + +function createBaseDropConnectionResponse(): DropConnectionResponse { + return {}; +} + +export const DropConnectionResponse = { + fromJSON(_: any): DropConnectionResponse { + return {}; + }, + + toJSON(_: DropConnectionResponse): unknown { + const obj: any = {}; + return obj; + }, + + fromPartial, I>>(_: I): DropConnectionResponse { + const message = createBaseDropConnectionResponse(); + return message; + }, +}; + type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; export type DeepPartial = T extends Builtin ? T diff --git a/proto/catalog.proto b/proto/catalog.proto index 0501abfbb892..23c178043422 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -84,6 +84,20 @@ message Sink { string definition = 13; } +message Connection { + message PrivateLinkService { + string provider = 1; + string endpoint_id = 2; + map dns_entries = 3; + } + + uint32 id = 1; + string name = 2; + oneof info { + PrivateLinkService private_link_service = 3; + } +} + message Index { uint32 id = 1; uint32 schema_id = 2; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 4f9cf91271cd..81c8ead1f851 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -237,6 +237,36 @@ message GetDdlProgressResponse { repeated DdlProgress ddl_progress = 1; } +message CreateConnectionRequest { + message PrivateLink { + string provider = 1; + string service_name = 2; + repeated string availability_zones = 3; + } + + oneof payload { + PrivateLink private_link = 1; + } +} + +message CreateConnectionResponse { + uint32 connection_id = 1; + // global catalog version + uint64 version = 2; +} + +message ListConnectionsRequest {} + +message ListConnectionsResponse { + repeated catalog.Connection connections = 1; +} + +message DropConnectionRequest { + string connection_name = 1; +} + +message DropConnectionResponse {} + service DdlService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse); @@ -260,4 +290,7 @@ service DdlService { rpc ReplaceTablePlan(ReplaceTablePlanRequest) returns (ReplaceTablePlanResponse); rpc GetTable(GetTableRequest) returns (GetTableResponse); rpc GetDdlProgress(GetDdlProgressRequest) returns (GetDdlProgressResponse); + rpc CreateConnection(CreateConnectionRequest) returns (CreateConnectionResponse); + rpc ListConnections(ListConnectionsRequest) returns (ListConnectionsResponse); + rpc DropConnection(DropConnectionRequest) returns (DropConnectionResponse); } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 72f25cfa3e0b..afa0535ecaeb 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -18,8 +18,9 @@ anyhow = "1" apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "waruto/modify-decimal", features = ["snappy", "zstandard", "bzip", "xz"] } async-trait = "0.1" aws-config = { workspace = true } +aws-sdk-ec2 = { workspace = true } aws-sdk-kinesis = { workspace = true } -aws-sdk-s3 = { workspace = true } +aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } aws-types = { workspace = true } bincode = "1" @@ -58,7 +59,7 @@ rust_decimal = "1" serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" -serde_with = "2" +serde_with = { version = "2", features = ["json"] } simd-json = { git = "https://github.com/tabVersion/simd-json.git", branch = "main", features = ["key-to-lowercase"] } thiserror = "1" tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index c27ec551b7da..f2a52d39fc03 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -13,22 +13,43 @@ // limitations under the License. use std::borrow::Cow; +use std::collections::HashMap; use aws_sdk_kinesis::Client as KinesisClient; use http::Uri; use rdkafka::ClientConfig; use serde_derive::{Deserialize, Serialize}; +use serde_with::json::JsonString; +use serde_with::serde_as; use crate::source::kinesis::config::AwsConfigInfo; // The file describes the common abstractions for each connector and can be used in both source and // sink. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AwsPrivateLinkItem { + pub service_name: String, + pub availability_zone: String, + pub port: u16, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AwsPrivateLinks { + pub provider: String, + pub infos: Vec, +} + +#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KafkaCommon { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, + #[serde(rename = "broker.rewrite.endpoints")] + #[serde_as(as = "Option")] + pub broker_rewrite_map: Option>, + #[serde(rename = "topic", alias = "kafka.topic")] pub topic: String, diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 725518de9103..01dd331ad431 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -491,6 +491,25 @@ mod tests { } } + #[test] + fn test_extract_kafka_config() { + let props: HashMap = convert_args!(hashmap!( + "connector" => "kafka", + "properties.bootstrap.server" => "b1,b2", + "topic" => "test", + "scan.startup.mode" => "earliest", + "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#, + )); + + let props = ConnectorProperties::extract(props).unwrap(); + if let ConnectorProperties::Kafka(k) = props { + assert!(k.common.broker_rewrite_map.is_some()); + println!("{:?}", k.common.broker_rewrite_map); + } else { + panic!("extract kafka config failed"); + } + } + #[test] fn test_extract_cdc_properties() { let user_props_mysql: HashMap = convert_args!(hashmap!( diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 7f90cbe5ff68..b86aa4fd0a4d 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -16,13 +16,13 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; -use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext}; +use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; use rdkafka::{Offset, TopicPartitionList}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; -use crate::source::kafka::{KafkaProperties, KAFKA_SYNC_CALL_TIMEOUT}; +use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_SYNC_CALL_TIMEOUT}; #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum KafkaEnumeratorOffset { @@ -35,7 +35,7 @@ pub enum KafkaEnumeratorOffset { pub struct KafkaSplitEnumerator { broker_address: String, topic: String, - client: BaseConsumer, + client: BaseConsumer, start_offset: KafkaEnumeratorOffset, // maybe used in the future for batch processing @@ -54,10 +54,10 @@ impl SplitEnumerator for KafkaSplitEnumerator { let common_props = &properties.common; let broker_address = common_props.brokers.clone(); + let broker_rewrite_map = common_props.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); common_props.set_security_properties(&mut config); - let mut scan_start_offset = match properties .scan_startup_mode .as_ref() @@ -79,7 +79,9 @@ impl SplitEnumerator for KafkaSplitEnumerator { scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset) } - let client: BaseConsumer = config.create_with_context(DefaultConsumerContext).await?; + let client_ctx = PrivateLinkConsumerContext::new(broker_rewrite_map)?; + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; Ok(Self { broker_address, diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index f7dd3ae9cea5..27e9406119ef 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,16 +17,19 @@ use std::time::Duration; use serde::Deserialize; pub mod enumerator; +pub mod private_link; pub mod source; pub mod split; pub use enumerator::*; +pub use private_link::*; pub use source::*; pub use split::*; use crate::common::KafkaCommon; - pub const KAFKA_CONNECTOR: &str = "kafka"; +pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server"; +pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers"; #[derive(Clone, Debug, Deserialize)] pub struct KafkaProperties { diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs new file mode 100644 index 000000000000..020a85c173d1 --- /dev/null +++ b/src/connector/src/source/kafka/private_link.rs @@ -0,0 +1,67 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::str::FromStr; + +use rdkafka::client::BrokerAddr; +use rdkafka::consumer::ConsumerContext; +use rdkafka::ClientContext; +use risingwave_common::util::addr::HostAddr; + +pub struct PrivateLinkConsumerContext { + rewrite_map: BTreeMap, +} + +impl PrivateLinkConsumerContext { + pub fn new(broker_rewrite_map: Option>) -> anyhow::Result { + let rewrite_map: anyhow::Result> = broker_rewrite_map + .map_or(Ok(BTreeMap::new()), |addr_map| { + addr_map + .into_iter() + .map(|(old_addr, new_addr)| { + let old_addr = HostAddr::from_str(&old_addr)?; + let new_addr = HostAddr::from_str(&new_addr)?; + let old_addr = BrokerAddr { + host: old_addr.host, + port: old_addr.port.to_string(), + }; + let new_addr = BrokerAddr { + host: new_addr.host, + port: new_addr.port.to_string(), + }; + Ok((old_addr, new_addr)) + }) + .collect() + }); + let rewrite_map = rewrite_map?; + tracing::info!("broker addr rewrite map {:?}", rewrite_map); + Ok(Self { rewrite_map }) + } +} + +impl ClientContext for PrivateLinkConsumerContext { + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + match self.rewrite_map.get(&addr) { + None => addr, + Some(new_addr) => { + tracing::debug!("broker addr {:?} rewrote to {:?}", addr, new_addr); + new_addr.clone() + } + } + } +} + +// required by the trait bound of BaseConsumer +impl ConsumerContext for PrivateLinkConsumerContext {} diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 9432a8028c47..63380043632e 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -20,13 +20,13 @@ use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use rdkafka::config::RDKafkaLogLevel; -use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; +use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; use crate::source::base::{SourceMessage, MAX_CHUNK_SIZE}; -use crate::source::kafka::KafkaProperties; +use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext}; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, SplitReader, @@ -35,7 +35,7 @@ use crate::source::{ impl_common_split_reader_logic!(KafkaSplitReader, KafkaProperties); pub struct KafkaSplitReader { - consumer: StreamConsumer, + consumer: StreamConsumer, start_offset: Option, stop_offset: Option, bytes_per_second: usize, @@ -61,6 +61,7 @@ impl SplitReader for KafkaSplitReader { let mut config = ClientConfig::new(); let bootstrap_servers = &properties.common.brokers; + let broker_rewrite_map = properties.common.broker_rewrite_map.clone(); // disable partition eof config.set("enable.partition.eof", "false"); @@ -83,9 +84,10 @@ impl SplitReader for KafkaSplitReader { ); } - let consumer: StreamConsumer = config + let client_ctx = PrivateLinkConsumerContext::new(broker_rewrite_map)?; + let consumer: StreamConsumer = config .set_log_level(RDKafkaLogLevel::Info) - .create_with_context(DefaultConsumerContext) + .create_with_context(client_ctx) .await .map_err(|e| anyhow!("failed to create kafka consumer: {}", e))?; diff --git a/src/ctl/src/cmd_impl/meta.rs b/src/ctl/src/cmd_impl/meta.rs index 97dbdf48c32f..abbd9d1a2bc2 100644 --- a/src/ctl/src/cmd_impl/meta.rs +++ b/src/ctl/src/cmd_impl/meta.rs @@ -14,10 +14,12 @@ mod backup_meta; mod cluster_info; +mod connection; mod pause_resume; mod reschedule; pub use backup_meta::*; pub use cluster_info::*; +pub use connection::*; pub use pause_resume::*; pub use reschedule::*; diff --git a/src/ctl/src/cmd_impl/meta/connection.rs b/src/ctl/src/cmd_impl/meta/connection.rs new file mode 100644 index 000000000000..32e1c420c48b --- /dev/null +++ b/src/ctl/src/cmd_impl/meta/connection.rs @@ -0,0 +1,72 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::catalog::connection::Info; +use risingwave_pb::ddl_service::create_connection_request; +use risingwave_pb::ddl_service::create_connection_request::PrivateLink; + +use crate::common::CtlContext; + +pub async fn create_connection( + context: &CtlContext, + provider: String, + service_name: String, + availability_zone: String, +) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + let availability_zones: Vec = availability_zone + .split(',') + .map(|str| str.to_string()) + .collect(); + let conn_id = meta_client + .create_connection(create_connection_request::Payload::PrivateLink( + PrivateLink { + provider, + service_name, + availability_zones, + }, + )) + .await?; + + println!("Create connection success id#{}", conn_id); + Ok(()) +} + +pub async fn drop_connection(context: &CtlContext, conn_name: String) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + meta_client.drop_connection(&conn_name).await?; + println!("Drop connection {} success", conn_name); + Ok(()) +} + +pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + let connections = meta_client.list_connections(None).await?; + + for conn in connections { + println!( + "Connection#{}, service_name: {}, {}", + conn.id, + conn.name, + match conn.info { + Some(Info::PrivateLinkService(svc)) => format!( + "PrivateLink: endpoint_id: {}, dns_entries: {:?}", + svc.endpoint_id, svc.dns_entries, + ), + None => "None".to_string(), + } + ); + } + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index b96ab6c9a0e5..6d41574fd7a8 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -203,6 +203,25 @@ enum MetaCommands { BackupMeta, /// delete meta snapshots DeleteMetaSnapshots { snapshot_ids: Vec }, + + /// Create a new connection object + CreateConnection { + #[clap(long)] + provider: String, + #[clap(long)] + service_name: String, + #[clap(long)] + availability_zones: String, + }, + + /// List all existing connections in the catalog + ListConnections, + + /// Drop a connection by its name + DropConnection { + #[clap(long)] + connection_name: String, + }, } pub async fn start(opts: CliOpts) -> Result<()> { @@ -314,6 +333,20 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => { cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await? } + Commands::Meta(MetaCommands::CreateConnection { + provider, + service_name, + availability_zones, + }) => { + cmd_impl::meta::create_connection(context, provider, service_name, availability_zones) + .await? + } + Commands::Meta(MetaCommands::ListConnections) => { + cmd_impl::meta::list_connections(context).await? + } + Commands::Meta(MetaCommands::DropConnection { connection_name }) => { + cmd_impl::meta::drop_connection(context, connection_name).await? + } Commands::Trace => cmd_impl::trace::trace(context).await?, Commands::Profile { sleep } => cmd_impl::profile::profile(context, sleep).await?, } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 22a90adbf3a5..d5a13237e98b 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -18,6 +18,8 @@ anyhow = "1" arc-swap = "1" assert_matches = "1" async-trait = "0.1" +aws-config = { workspace = true } +aws-sdk-ec2 = { workspace = true } bytes = { version = "1", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } crepe = "0.1" diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 4b5e41f99755..7a3485fb2ede 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -172,6 +172,15 @@ impl From for MetaError { } } +impl From> for MetaError +where + E: std::error::Error + Sync + Send + 'static, +{ + fn from(e: aws_sdk_ec2::types::SdkError) -> Self { + MetaErrorInner::Internal(e.into()).into() + } +} + impl From for tonic::Status { fn from(err: MetaError) -> Self { match &*err.inner { diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 8e0b42b0514d..2c91ace5ca79 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -61,6 +61,12 @@ use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend}; #[derive(Debug, Clone, Parser)] pub struct MetaNodeOpts { + #[clap(long, env = "RW_VPC_ID")] + vpd_id: Option, + + #[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")] + security_group_id: Option, + // TODO: rename to listen_address and separate out the port. #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")] listen_addr: String, @@ -228,6 +234,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec, node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec, prometheus_endpoint: opts.prometheus_endpoint, + vpc_id: opts.vpd_id, + security_group_id: opts.security_group_id, connector_rpc_endpoint: opts.connector_rpc_endpoint, periodic_space_reclaim_compaction_interval_sec: config .meta diff --git a/src/meta/src/manager/catalog/connection.rs b/src/meta/src/manager/catalog/connection.rs new file mode 100644 index 000000000000..12364530255c --- /dev/null +++ b/src/meta/src/manager/catalog/connection.rs @@ -0,0 +1,69 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; + +use risingwave_pb::catalog::Connection; + +use crate::manager::{ConnectionId, MetaSrvEnv}; +use crate::model::MetadataModel; +use crate::storage::MetaStore; +use crate::{MetaError, MetaResult}; + +pub struct ConnectionManager { + pub connections: BTreeMap, + + // index by service name + pub connection_by_name: HashMap, +} + +impl ConnectionManager { + pub async fn new(env: MetaSrvEnv) -> MetaResult { + // load connections from meta store + let connections = Connection::list(env.meta_store()).await?; + let connections = BTreeMap::from_iter(connections.into_iter().map(|conn| (conn.id, conn))); + + let connection_by_name = connections + .values() + .map(|conn| (conn.name.clone(), conn.id)) + .collect(); + + Ok(Self { + connections, + connection_by_name, + }) + } + + pub(crate) fn check_connection_duplicated(&self, conn_name: &str) -> MetaResult<()> { + if self + .connections + .values() + .any(|conn| conn.name.eq(conn_name)) + { + Err(MetaError::catalog_duplicated("connection", conn_name)) + } else { + Ok(()) + } + } + + pub fn get_connection_by_name(&self, name: &str) -> Option<&Connection> { + self.connection_by_name + .get(name) + .and_then(|id| self.connections.get(id)) + } + + pub fn list_connections(&self) -> Vec { + self.connections.values().cloned().collect() + } +} diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3f84f2102da4..b35126e1507d 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod connection; mod database; mod fragment; mod user; @@ -22,6 +23,7 @@ use std::option::Option::Some; use std::sync::Arc; use anyhow::{anyhow, Context}; +pub use connection::*; pub use database::*; pub use fragment::*; use itertools::Itertools; @@ -32,7 +34,9 @@ use risingwave_common::catalog::{ }; use risingwave_common::{bail, ensure}; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{ + Connection, Database, Function, Index, Schema, Sink, Source, Table, View, +}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object}; use risingwave_pb::user::update_user_request::UpdateField; @@ -57,6 +61,7 @@ pub type ViewId = u32; pub type FunctionId = u32; pub type UserId = u32; +pub type ConnectionId = u32; /// `commit_meta` provides a wrapper for committing metadata changes to both in-memory and /// meta store. @@ -101,13 +106,19 @@ pub struct CatalogManager { pub struct CatalogManagerCore { pub database: DatabaseManager, pub user: UserManager, + pub connection: ConnectionManager, } impl CatalogManagerCore { async fn new(env: MetaSrvEnv) -> MetaResult { let database = DatabaseManager::new(env.clone()).await?; - let user = UserManager::new(env, &database).await?; - Ok(Self { database, user }) + let user = UserManager::new(env.clone(), &database).await?; + let connection = ConnectionManager::new(env).await?; + Ok(Self { + database, + user, + connection, + }) } } @@ -325,6 +336,40 @@ where } } + /// Each connection is identified by a unique name + pub async fn create_connection( + &self, + connection: Connection, + ) -> MetaResult { + let core = &mut self.core.lock().await.connection; + core.check_connection_duplicated(&connection.name)?; + + let conn_id = connection.id; + let conn_name = connection.name.clone(); + let mut connections = BTreeMapTransaction::new(&mut core.connections); + connections.insert(conn_id, connection); + commit_meta!(self, connections)?; + + core.connection_by_name.insert(conn_name, conn_id); + // Currently we don't need to notify frontend, so just fill 0 here + Ok(0) + } + + pub async fn drop_connection(&self, conn_name: &str) -> MetaResult { + let core = &mut self.core.lock().await.connection; + + let conn_id = core + .connection_by_name + .remove(conn_name) + .ok_or_else(|| anyhow!("connection {} not found", conn_name))?; + + let mut connections = BTreeMapTransaction::new(&mut core.connections); + connections.remove(conn_id); + commit_meta!(self, connections)?; + // Currently we don't need to notify frontend, so just fill 0 here + Ok(0) + } + pub async fn create_schema(&self, schema: &Schema) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -871,6 +916,13 @@ where } } + pub async fn get_connection_by_name(&self, name: &str) -> MetaResult { + let core = &mut self.core.lock().await.connection; + core.get_connection_by_name(name) + .cloned() + .ok_or_else(|| anyhow!(format!("could not find connection by the given name")).into()) + } + pub async fn finish_create_source_procedure( &self, source: &Source, @@ -1521,6 +1573,10 @@ where Ok(()) } + pub async fn list_connections(&self) -> Vec { + self.core.lock().await.connection.list_connections() + } + pub async fn list_databases(&self) -> Vec { self.core.lock().await.database.list_databases() } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index ff244bad41b3..1840669c57b2 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -87,6 +87,12 @@ pub struct MetaOpts { /// The prometheus endpoint for dashboard service. pub prometheus_endpoint: Option, + /// The VPC id of the cluster. + pub vpc_id: Option, + + /// A usable security group id to assign to a vpc endpoint + pub security_group_id: Option, + /// Endpoint of the connector node, there will be a sidecar connector node /// colocated with Meta node in the cloud environment pub connector_rpc_endpoint: Option, @@ -113,6 +119,8 @@ impl MetaOpts { periodic_compaction_interval_sec: 60, node_num_monitor_interval_sec: 10, prometheus_endpoint: None, + vpc_id: None, + security_group_id: None, connector_rpc_endpoint: None, periodic_space_reclaim_compaction_interval_sec: 60, periodic_ttl_reclaim_compaction_interval_sec: 60, diff --git a/src/meta/src/manager/id.rs b/src/meta/src/manager/id.rs index ddbe8f4578f5..f728897656f0 100644 --- a/src/meta/src/manager/id.rs +++ b/src/meta/src/manager/id.rs @@ -141,6 +141,7 @@ pub mod IdCategory { pub const _Index: IdCategoryType = 14; pub const CompactionGroup: IdCategoryType = 15; pub const Function: IdCategoryType = 16; + pub const Connection: IdCategoryType = 17; } pub type IdGeneratorManagerRef = Arc>; @@ -163,6 +164,7 @@ pub struct IdGeneratorManager { hummock_compaction_task: Arc>, parallel_unit: Arc>, compaction_group: Arc>, + connection: Arc>, } impl IdGeneratorManager @@ -219,6 +221,9 @@ where ) .await, ), + connection: Arc::new( + StoredIdGenerator::new(meta_store.clone(), "connection", None).await, + ), } } @@ -239,6 +244,7 @@ where IdCategory::ParallelUnit => &self.parallel_unit, IdCategory::HummockCompactionTask => &self.hummock_compaction_task, IdCategory::CompactionGroup => &self.compaction_group, + IdCategory::Connection => &self.connection, _ => unreachable!(), } } diff --git a/src/meta/src/model/connection.rs b/src/meta/src/model/connection.rs new file mode 100644 index 000000000000..7f762ab93120 --- /dev/null +++ b/src/meta/src/model/connection.rs @@ -0,0 +1,41 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::catalog::Connection; + +use crate::model::{MetadataModel, MetadataModelResult}; + +/// Column family name for connection. +const CONNECTION_CF_NAME: &str = "cf/connection"; + +impl MetadataModel for Connection { + type KeyType = u32; + type ProstType = Connection; + + fn cf_name() -> String { + CONNECTION_CF_NAME.to_string() + } + + fn to_protobuf(&self) -> Self::ProstType { + self.clone() + } + + fn from_protobuf(prost: Self::ProstType) -> Self { + prost + } + + fn key(&self) -> MetadataModelResult { + Ok(self.id) + } +} diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 8937051f49cb..5acbe36f63a2 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -15,6 +15,7 @@ mod barrier; mod catalog; mod cluster; +mod connection; mod error; mod notification; mod stream; @@ -177,6 +178,7 @@ macro_rules! for_all_metadata_models { { crate::model::stream::TableFragments }, { risingwave_pb::user::UserInfo }, { risingwave_pb::catalog::Function }, + { risingwave_pb::catalog::Connection }, // These items need not be included in a meta snapshot. { crate::model::cluster::Worker }, { risingwave_pb::hummock::CompactTaskAssignment }, diff --git a/src/meta/src/rpc/cloud_provider.rs b/src/meta/src/rpc/cloud_provider.rs new file mode 100644 index 000000000000..7a53e3cf6ee0 --- /dev/null +++ b/src/meta/src/rpc/cloud_provider.rs @@ -0,0 +1,158 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use aws_config::retry::RetryConfig; +use aws_sdk_ec2::model::{Filter, VpcEndpointType}; +use itertools::Itertools; +use risingwave_pb::catalog::connection::PrivateLinkService; + +use crate::MetaResult; + +const CLOUD_PROVIDER_AWS: &str = "aws"; + +#[derive(Clone)] +pub struct AwsEc2Client { + client: aws_sdk_ec2::Client, + /// `vpc_id`: The VPC of the running RisingWave instance + vpc_id: String, + security_group_id: String, +} + +impl AwsEc2Client { + pub async fn new(vpc_id: &str, security_group_id: &str) -> Self { + let sdk_config = aws_config::from_env() + .retry_config(RetryConfig::standard().with_max_attempts(4)) + .load() + .await; + let client = aws_sdk_ec2::Client::new(&sdk_config); + + Self { + client, + vpc_id: vpc_id.to_string(), + security_group_id: security_group_id.to_string(), + } + } + + /// `service_name`: The name of the endpoint service we want to access + pub async fn create_aws_private_link( + &self, + service_name: &str, + az_ids: &[String], + ) -> MetaResult { + let subnet_and_azs = self.describe_subnets(&self.vpc_id, az_ids).await?; + + let subnet_ids: Vec = subnet_and_azs.iter().map(|(id, _, _)| id.clone()).collect(); + let az_to_azid_map: HashMap = subnet_and_azs + .into_iter() + .map(|(_, az, az_id)| (az, az_id)) + .collect(); + + let (endpoint_id, endpoint_dns_names) = self + .create_vpc_endpoint( + &self.vpc_id, + service_name, + &self.security_group_id, + &subnet_ids, + ) + .await?; + + // The number of returned DNS names may not equal to the input AZs, + // because some AZs may not have a subnet in the RW VPC + let mut azid_to_dns_map = HashMap::new(); + for dns_name in &endpoint_dns_names { + for az in az_to_azid_map.keys() { + if dns_name.contains(az) { + azid_to_dns_map + .insert(az_to_azid_map.get(az).unwrap().clone(), dns_name.clone()); + break; + } + } + } + + Ok(PrivateLinkService { + provider: CLOUD_PROVIDER_AWS.to_string(), + endpoint_id, + dns_entries: azid_to_dns_map, + }) + } + + async fn describe_subnets( + &self, + vpc_id: &str, + az_ids: &[String], + ) -> MetaResult> { + let vpc_filter = Filter::builder().name("vpc-id").values(vpc_id).build(); + let az_filter = Filter::builder() + .name("availability-zone-id") + .set_values(Some(Vec::from(az_ids))) + .build(); + let output = self + .client + .describe_subnets() + .set_filters(Some(vec![vpc_filter, az_filter])) + .send() + .await?; + + let subnets = output + .subnets + .unwrap_or_default() + .into_iter() + .unique_by(|s| s.availability_zone().unwrap_or_default().to_string()) + .map(|s| { + ( + s.subnet_id.unwrap_or_default(), + s.availability_zone.unwrap_or_default(), + s.availability_zone_id.unwrap_or_default(), + ) + }) + .collect(); + Ok(subnets) + } + + async fn create_vpc_endpoint( + &self, + vpc_id: &str, + service_name: &str, + security_group_id: &str, + subnet_ids: &[String], + ) -> MetaResult<(String, Vec)> { + let output = self + .client + .create_vpc_endpoint() + .vpc_endpoint_type(VpcEndpointType::Interface) + .vpc_id(vpc_id) + .security_group_ids(security_group_id) + .service_name(service_name) + .set_subnet_ids(Some(subnet_ids.to_owned())) + .send() + .await?; + + let endpoint = output.vpc_endpoint().unwrap(); + let mut dns_names = Vec::new(); + if let Some(dns_entries) = endpoint.dns_entries() { + dns_entries.iter().for_each(|e| { + if let Some(dns_name) = e.dns_name() { + dns_names.push(dns_name.to_string()); + } + }); + } + + Ok(( + endpoint.vpc_endpoint_id().unwrap_or_default().to_string(), + dns_names, + )) + } +} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e4d9c1b6b639..a52f526ca3ce 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_pb::catalog::{Database, Function, Schema, Source, Table, View}; +use risingwave_pb::catalog::{Connection, Database, Function, Schema, Source, Table, View}; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; @@ -65,6 +65,8 @@ pub enum DdlCommand { CreatingStreamingJob(StreamingJob, StreamFragmentGraphProto), DropStreamingJob(StreamingJobId), ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), + CreateConnection(Connection), + DropConnection(String), } #[derive(Clone)] @@ -141,6 +143,10 @@ where ctrl.replace_table(stream_job, fragment_graph, table_col_index_mapping) .await } + DdlCommand::CreateConnection(connection) => { + ctrl.create_connection(connection).await + } + DdlCommand::DropConnection(conn_name) => ctrl.drop_connection(&conn_name).await, } }); handler.await.unwrap() @@ -219,6 +225,14 @@ where self.catalog_manager.drop_view(view_id).await } + async fn create_connection(&self, connection: Connection) -> MetaResult { + self.catalog_manager.create_connection(connection).await + } + + async fn drop_connection(&self, conn_name: &str) -> MetaResult { + self.catalog_manager.drop_connection(conn_name).await + } + async fn create_streaming_job( &self, mut stream_job: StreamingJob, diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index dc323ea69284..4c1f738eee0a 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod cloud_provider; pub mod ddl_controller; mod election_client; mod intercept; diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 2ec0ef07eb93..ff7e6c979868 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -51,6 +51,7 @@ use crate::manager::{ CatalogManager, ClusterManager, FragmentManager, IdleManager, MetaOpts, MetaSrvEnv, SystemParamsManager, }; +use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election_client::{ElectionClient, EtcdElectionClient}; use crate::rpc::metrics::{start_worker_info_monitor, MetaMetrics}; use crate::rpc::service::backup_service::BackupServiceImpl; @@ -441,8 +442,15 @@ pub async fn start_service_as_election_leader( compactor_manager.clone(), )); + let mut aws_cli = None; + if let Some(my_vpc_id) = &env.opts.vpc_id && let Some(security_group_id) = &env.opts.security_group_id { + let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await; + aws_cli = Some(cli); + } + let ddl_srv = DdlServiceImpl::::new( env.clone(), + aws_cli, catalog_manager.clone(), stream_manager.clone(), source_manager.clone(), diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 7e1ff0b92caf..77a0f98be9fb 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -12,8 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + +use anyhow::anyhow; +use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_connector::common::AwsPrivateLinks; +use risingwave_connector::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; +use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; +use risingwave_pb::catalog::{connection, Connection}; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::SourceId as ProstSourceId; use risingwave_pb::ddl_service::*; @@ -25,10 +34,11 @@ use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, IdCategory, IdCategoryType, MetaSrvEnv, StreamingJob, }; +use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::ddl_controller::{DdlCommand, DdlController, StreamingJobId}; use crate::storage::MetaStore; use crate::stream::{visit_fragment, GlobalStreamManagerRef, SourceManagerRef}; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; #[derive(Clone)] pub struct DdlServiceImpl { @@ -36,6 +46,7 @@ pub struct DdlServiceImpl { catalog_manager: CatalogManagerRef, ddl_controller: DdlController, + aws_client: Option, } impl DdlServiceImpl @@ -45,6 +56,7 @@ where #[allow(clippy::too_many_arguments)] pub fn new( env: MetaSrvEnv, + aws_client: Option, catalog_manager: CatalogManagerRef, stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, @@ -65,10 +77,30 @@ where env, catalog_manager, ddl_controller, + aws_client, } } } +#[inline(always)] +fn is_kafka_connector(with_properties: &HashMap) -> bool { + const UPSTREAM_SOURCE_KEY: &str = "connector"; + with_properties + .get(UPSTREAM_SOURCE_KEY) + .unwrap_or(&"".to_string()) + .to_lowercase() + .eq_ignore_ascii_case(KAFKA_CONNECTOR) +} + +#[inline(always)] +fn kafka_props_broker_key(with_properties: &HashMap) -> &str { + if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) { + KAFKA_PROPS_BROKER_KEY + } else { + KAFKA_PROPS_BROKER_KEY_ALIAS + } +} + #[async_trait::async_trait] impl DdlService for DdlServiceImpl where @@ -154,6 +186,10 @@ where ) -> Result, Status> { let mut source = request.into_inner().get_source()?.clone(); + // resolve private links before starting the DDL procedure + self.resolve_private_link_info(&mut source.properties) + .await?; + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; source.id = id; @@ -535,6 +571,71 @@ where ddl_progress: self.ddl_controller.get_ddl_progress().await, })) } + + async fn create_connection( + &self, + request: Request, + ) -> Result, Status> { + if self.aws_client.is_none() { + return Err(Status::from(MetaError::unavailable( + "AWS client is not configured".into(), + ))); + } + + let req = request.into_inner(); + if req.payload.is_none() { + return Err(Status::invalid_argument("request is empty")); + } + + match req.payload.unwrap() { + create_connection_request::Payload::PrivateLink(link) => { + let cli = self.aws_client.as_ref().unwrap(); + let private_link_svc = cli + .create_aws_private_link(&link.service_name, &link.availability_zones) + .await?; + + let id = self.gen_unique_id::<{ IdCategory::Connection }>().await?; + let connection = Connection { + id, + name: link.service_name.clone(), + info: Some(connection::Info::PrivateLinkService(private_link_svc)), + }; + + // save private link info to catalog + self.ddl_controller + .run_command(DdlCommand::CreateConnection(connection)) + .await?; + + Ok(Response::new(CreateConnectionResponse { + connection_id: id, + version: 0, + })) + } + } + } + + async fn list_connections( + &self, + _request: Request, + ) -> Result, Status> { + let conns = self.catalog_manager.list_connections().await; + Ok(Response::new(ListConnectionsResponse { + connections: conns, + })) + } + + async fn drop_connection( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + self.ddl_controller + .run_command(DdlCommand::DropConnection(req.connection_name)) + .await?; + + Ok(Response::new(DropConnectionResponse {})) + } } impl DdlServiceImpl @@ -545,4 +646,72 @@ where let id = self.env.id_gen_manager().generate::().await? as u32; Ok(id) } + + async fn resolve_private_link_info( + &self, + properties: &mut HashMap, + ) -> MetaResult<()> { + let mut broker_rewrite_map = HashMap::new(); + const UPSTREAM_SOURCE_PRIVATE_LINK_KEY: &str = "private.links"; + if let Some(prop) = properties.get(UPSTREAM_SOURCE_PRIVATE_LINK_KEY) { + if !is_kafka_connector(properties) { + return Err(MetaError::from(anyhow!( + "Private link is only supported for Kafka connector", + ))); + } + + let servers = properties + .get(kafka_props_broker_key(properties)) + .cloned() + .ok_or(MetaError::from(anyhow!( + "Must specify brokers in WITH clause", + )))?; + + let broker_addrs = servers.split(',').collect_vec(); + let link_info: AwsPrivateLinks = serde_json::from_str(prop).map_err(|e| anyhow!(e))?; + // construct the rewrite mapping for brokers + for (link, broker) in link_info.infos.iter().zip_eq_fast(broker_addrs.into_iter()) { + let conn = self + .catalog_manager + .get_connection_by_name(&link.service_name) + .await?; + + if let Some(info) = conn.info { + match info { + connection::Info::PrivateLinkService(svc) => { + if svc.dns_entries.is_empty() { + return Err(MetaError::from(anyhow!( + "No available private link endpoints for Kafka broker {}", + broker + ))); + } + let default_dns = svc.dns_entries.values().next().unwrap(); + let target_dns = svc.dns_entries.get(&link.availability_zone); + match target_dns { + None => { + broker_rewrite_map.insert( + broker.to_string(), + format!("{}:{}", default_dns, link.port), + ); + } + Some(dns_name) => { + broker_rewrite_map.insert( + broker.to_string(), + format!("{}:{}", dns_name, link.port), + ); + } + } + } + } + } + } + + // save private link dns names into source properties, which + // will be extracted into KafkaProperties + let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?; + const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints"; + properties.insert(BROKER_REWRITE_MAP_KEY.to_string(), json); + } + Ok(()) + } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e1f36516e29e..77cc767bccfe 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -39,7 +39,7 @@ use risingwave_hummock_sdk::{ use risingwave_pb::backup_service::backup_service_client::BackupServiceClient; use risingwave_pb::backup_service::*; use risingwave_pb::catalog::{ - Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, + Connection, Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, View as ProstView, }; @@ -123,6 +123,26 @@ impl MetaClient { .await } + pub async fn create_connection(&self, req: create_connection_request::Payload) -> Result { + let request = CreateConnectionRequest { payload: Some(req) }; + let resp = self.inner.create_connection(request).await?; + Ok(resp.connection_id) + } + + pub async fn list_connections(&self, _name: Option<&str>) -> Result> { + let request = ListConnectionsRequest {}; + let resp = self.inner.list_connections(request).await?; + Ok(resp.connections) + } + + pub async fn drop_connection(&self, connection_name: &str) -> Result<()> { + let request = DropConnectionRequest { + connection_name: connection_name.to_string(), + }; + let _ = self.inner.drop_connection(request).await?; + Ok(()) + } + pub(crate) fn parse_meta_addr(meta_addr: &str) -> Result { if meta_addr.starts_with(Self::META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) { let addr = meta_addr @@ -1358,6 +1378,9 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, replace_table_plan, ReplaceTablePlanRequest, ReplaceTablePlanResponse } ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse } ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse } + ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse } + ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse } + ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index 9a1c22f6feaf..c7a4c176ddbc 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -145,6 +145,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { }; let filter = filter::Targets::new() + .with_target("aws_sdk_ec2", Level::INFO) .with_target("aws_sdk_s3", Level::INFO) .with_target("aws_config", Level::WARN) // Only enable WARN and ERROR for 3rd-party crates diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index ffd00dd98ca3..5c82906fbe73 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -86,6 +86,8 @@ reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } +serde_json = { version = "1", features = ["alloc"] } +serde_with = { version = "2", features = ["json"] } smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } strum = { version = "0.24", features = ["derive"] } @@ -179,6 +181,8 @@ reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } +serde_json = { version = "1", features = ["alloc"] } +serde_with = { version = "2", features = ["json"] } smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } strum = { version = "0.24", features = ["derive"] }