Skip to content

Commit

Permalink
chore: Bump OpenDAL to latest version (risingwavelabs#8481)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Mar 10, 2023
1 parent 38edae8 commit 6797904
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 24 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fail = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
itertools = "0.10"
opendal = "0.27.2"
opendal = "0.30"
prometheus = { version = "0.13", features = ["process"] }
random-string = "1.0"
risingwave_common = { path = "../common" }
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/opendal_engine/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl OpendalObjectStore {

builder.root(&root);

let op: Operator = Operator::create(builder)?.finish();
let op: Operator = Operator::new(builder)?.finish();
Ok(Self {
op,
engine_type: EngineType::Gcs,
Expand Down
35 changes: 17 additions & 18 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::future::try_join_all;
use futures::StreamExt;
use itertools::Itertools;
use opendal::services::Memory;
use opendal::Operator;
use opendal::{Metakey, Operator};
use tokio::io::AsyncRead;

use crate::object::{
Expand Down Expand Up @@ -47,7 +47,7 @@ impl OpendalObjectStore {
// Create memory backend builder.
let builder = Memory::default();

let op: Operator = Operator::create(builder)?.finish();
let op: Operator = Operator::new(builder)?.finish();
Ok(Self {
op,
engine_type: EngineType::Memory,
Expand All @@ -65,7 +65,7 @@ impl ObjectStore for OpendalObjectStore {
if obj.is_empty() {
Err(ObjectError::internal("upload empty object"))
} else {
self.op.object(path).write(obj).await?;
self.op.write(path, obj).await?;
Ok(())
}
}
Expand All @@ -81,15 +81,15 @@ impl ObjectStore for OpendalObjectStore {
match block {
Some(block) => {
let range = block.offset as u64..(block.offset + block.size) as u64;
let res = Bytes::from(self.op.object(path).range_read(range).await?);
let res = Bytes::from(self.op.range_read(path, range).await?);

if block.size != res.len() {
Err(ObjectError::internal("bad block offset and size"))
} else {
Ok(res)
}
}
None => Ok(Bytes::from(self.op.object(path).read().await?)),
None => Ok(Bytes::from(self.op.read(path).await?)),
}
}

Expand All @@ -114,20 +114,15 @@ impl ObjectStore for OpendalObjectStore {
));

let reader = match start_pos {
Some(start_position) => {
self.op
.object(path)
.range_reader(start_position as u64..)
.await?
}
None => self.op.object(path).reader().await?,
Some(start_position) => self.op.range_reader(path, start_position as u64..).await?,
None => self.op.reader(path).await?,
};

Ok(Box::new(reader))
}

async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
let opendal_metadata = self.op.object(path).metadata().await?;
let opendal_metadata = self.op.stat(path).await?;
let key = path.to_string();
let last_modified = match opendal_metadata.last_modified() {
Some(t) => t.unix_timestamp() as f64,
Expand All @@ -144,24 +139,28 @@ impl ObjectStore for OpendalObjectStore {
}

async fn delete(&self, path: &str) -> ObjectResult<()> {
self.op.object(path).delete().await?;
self.op.delete(path).await?;
Ok(())
}

/// Deletes the objects with the given paths permanently from the storage. If an object
/// specified in the request is not found, it will be considered as successfully deleted.
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
self.op.batch().remove(paths.to_vec()).await?;
self.op.remove(paths.to_vec()).await?;
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<Vec<ObjectMetadata>> {
let mut object_lister = self.op.object(prefix).list().await?;
let mut object_lister = self.op.list(prefix).await?;
let mut metadata_list = vec![];
while let Some(obj) = object_lister.next().await {
let object = obj?;
let key = prefix.to_string();
let om = object.metadata().await?;

let om = self
.op
.metadata(&object, Metakey::LastModified | Metakey::ContentLength)
.await?;

let last_modified = match om.last_modified() {
Some(t) => t.unix_timestamp() as f64,
Expand Down Expand Up @@ -213,7 +212,7 @@ impl StreamingUploader for OpenDalStreamingUploader {
}

async fn finish(mut self: Box<Self>) -> ObjectResult<()> {
self.op.object(&self.path).write(self.buffer).await?;
self.op.write(&self.path, self.buffer).await?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/opendal_engine/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl OpendalObjectStore {
builder.endpoint(&endpoint);
builder.access_key_id(&access_key_id);
builder.access_key_secret(&access_key_secret);
let op: Operator = Operator::create(builder)?.finish();
let op: Operator = Operator::new(builder)?.finish();
Ok(Self {
op,
engine_type: EngineType::Oss,
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/opendal_engine/webhdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl OpendalObjectStore {
// NOTE: the root must be absolute path.
builder.root(&root);

let op: Operator = Operator::create(builder)?.finish();
let op: Operator = Operator::new(builder)?.finish();
Ok(Self {
op,
engine_type: EngineType::Webhdfs,
Expand Down

0 comments on commit 6797904

Please sign in to comment.