Skip to content

Commit

Permalink
refactor(sink): DRY SinkFormatterImpl::new (#16575)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored and xxhZs committed May 6, 2024
1 parent 86a3310 commit a112185
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 207 deletions.
32 changes: 1 addition & 31 deletions src/connector/src/schema/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use apache_avro::Schema as AvroSchema;

use super::loader::{LoadedSchema, SchemaLoader};
use super::loader::LoadedSchema;
use super::schema_registry::Subject;
use super::SchemaFetchError;

pub struct SchemaWithId {
pub schema: Arc<AvroSchema>,
pub id: i32,
}

/// Schema registry only
pub async fn fetch_schema(
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> {
let loader = SchemaLoader::from_format_options(topic, format_options)?;

let (key_id, key_avro) = loader.load_key_schema().await?;
let (val_id, val_avro) = loader.load_val_schema().await?;

Ok((
SchemaWithId {
id: key_id,
schema: Arc::new(key_avro),
},
SchemaWithId {
id: val_id,
schema: Arc::new(val_avro),
},
))
}

impl LoadedSchema for AvroSchema {
fn compile(primary: Subject, _: Vec<Subject>) -> Result<Self, SchemaFetchError> {
AvroSchema::parse_str(&primary.schema.content)
Expand Down
Loading

0 comments on commit a112185

Please sign in to comment.