Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Glue Catalog for Iceberg ingest extension #17392

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/development/extensions-contrib/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ Apache Iceberg is an open table format for huge analytic datasets. [IcebergInput

Iceberg manages most of its metadata in metadata files in the object storage. However, it is still dependent on a metastore to manage a certain amount of metadata.
Iceberg refers to these metastores as catalogs. The Iceberg extension lets you connect to the following Iceberg catalog types:

* Glue catalog
* REST-based catalog
* Hive metastore catalog
* Local catalog

Druid does not support AWS Glue catalog yet.

For a given catalog, Iceberg input source reads the table name from the catalog, applies the filters, and extracts all the underlying live data files up to the latest snapshot.
The data files can be in Parquet, ORC, or Avro formats. The data files typically reside in a warehouse location, which can be in HDFS, S3, or the local filesystem.
The `druid-iceberg-extensions` extension relies on the existing input source connectors in Druid to read the data files from the warehouse. Therefore, the Iceberg input source can be considered as an intermediate input source, which provides the file paths for other input source implementations.
Expand Down Expand Up @@ -116,6 +116,12 @@ The `warehouseSource` is set to `local` because this catalog only supports readi
To connect to an Iceberg REST Catalog server, configure the `icebergCatalog` type as `rest`. The Iceberg REST Open API spec gives catalogs greater control over the implementation and in most cases, the `warehousePath` does not have to be provided by the client.
Security credentials may be provided in the `catalogProperties` object.

## Glue catalog

Configure the `icebergCatalog` type as `glue`.`warehousePath` and properties must be provided in `catalogProperties` object.
Refer [Iceberg Glue Catalog documentation](https://iceberg.apache.org/docs/1.6.0/aws/#glue-catalog) for setting properties.


## Downloading Iceberg extension

To download `druid-iceberg-extensions`, run the following command after replacing `<VERSION>` with the desired
Expand Down
5 changes: 5 additions & 0 deletions extensions-contrib/druid-iceberg-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@
<artifactId>iceberg-hive-metastore</artifactId>
<version>${iceberg.core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.core.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.iceberg.input.IcebergInputSource;
import org.apache.druid.iceberg.input.LocalCatalog;
import org.apache.druid.iceberg.input.RestIcebergCatalog;
import org.apache.druid.iceberg.input.GlueIcebergCatalog;
import org.apache.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -47,8 +48,8 @@ public List<? extends Module> getJacksonModules()
new NamedType(HiveIcebergCatalog.class, HiveIcebergCatalog.TYPE_KEY),
new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY),
new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY),
new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY)

new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY),
new NamedType(GlueIcebergCatalog.class, GlueIcebergCatalog.TYPE_KEY)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.apache.druid.iceberg.input;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.aws.glue.GlueCatalog;
import javax.annotation.Nullable;
import java.util.Map;

public class GlueIcebergCatalog extends IcebergCatalog {
private static final String CATALOG_NAME = "glue";
private Catalog catalog;

public static final String TYPE_KEY = "glue";

@JsonProperty
private String warehousePath;

@JsonProperty
private Map<String, String> catalogProperties;

@JsonProperty
private final Boolean caseSensitive;
private static final Logger log = new Logger(GlueIcebergCatalog.class);

@JsonCreator
public GlueIcebergCatalog(
@JsonProperty("warehousePath") String warehousePath,
@JsonProperty("catalogProperties") @Nullable
Map<String, Object> catalogProperties,
@JsonProperty("caseSensitive") Boolean caseSensitive,
@JacksonInject @Json ObjectMapper mapper
)
{
this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null");
this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(catalogProperties, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, mapper);
this.caseSensitive = caseSensitive == null ? true : caseSensitive;
this.catalog = retrieveCatalog();
}

@Override
public Catalog retrieveCatalog() {
if (catalog == null) {
log.info("catalog is null, setting up default glue catalog.");
catalog = setupGlueCatalog();
}
log.info("Glue catalog set [%s].", catalog.toString());
return catalog;
}

private Catalog setupGlueCatalog() {
catalog = new GlueCatalog();
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehousePath);
catalog.initialize(CATALOG_NAME, catalogProperties);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalog properties must have these key value pairs

                "type" : "glue",
           	"catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
           	"io-impl": "org.apache.iceberg.aws.s3.S3FileIO",

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warehouse path must be s3://bucket/path

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS related env variables must be available where druid cluster is running.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS related env variables must be available where druid cluster is running.

Could we add more information related to this in the docs specific to the glue catalog?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will do that. Recently figured out that there is simpler approach in iceberg API itself to choose the catalog. I am spending sometime to check if that would drastically make it modular & work for all available iceberg catalog support on the fly.

return catalog;
}

@Override
public boolean isCaseSensitive()
{
return caseSensitive;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.druid.iceberg.input;


import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.util.HashMap;

public class GlueIcebergCatalogTest {
private final ObjectMapper mapper = new DefaultObjectMapper();
public void setUp() throws Exception {
}

public void tearDown() throws Exception {
}

@Test
public void testCatalogCreate() {
GlueIcebergCatalog glueCatalog = new GlueIcebergCatalog(
"s3://testbucket/testpath",
new HashMap<>(),
true,
mapper
);
Assert.assertEquals("glue", glueCatalog.retrieveCatalog().name());
}
@Test
public void testIsCaseSensitive() {
GlueIcebergCatalog glueCatalog = new GlueIcebergCatalog(
"s3://testbucket/testpath",
new HashMap<>(),
true,
mapper
);
Assert.assertEquals(true, glueCatalog.isCaseSensitive());
}
}
Loading