Skip to content

Latest commit

 

History

History
94 lines (66 loc) · 8.75 KB

README.md

File metadata and controls

94 lines (66 loc) · 8.75 KB

io.deltastandalone.springboot

Java CI with Maven

Delta Lake Format Use cases outside Spark

Delta lake format is becoming a new open-source standard for data analytics. But why?

  • ACID Support
  • Time Travel
  • Schema Evolution

Primarly, deltalake format standardization is acheived with spark engine (big data enigneering and analytical workloads). Can we read the datasets stored in delta format outside Spark using standard libraries? Yes, delta lake internally uses parquet format to store data and transaction log to persist metadata of operations performed on a given dataset. So if we can figure out a way how transaction log (a.k.a _delta_log) works, then the actual data files stored in parquet format can be read using apache-parquet & parquet-tools libraries.

What consists of Transaction Log?

Whenever user modifies a dataset (insert/update/delete), delta lake breaks the operation into series of steps composed of one or more action. A few examples of an action are Add file, Remove file, Update metadata etc. All these actions are recorded as atomic steps called commits stored as a json file. Each action on a dataset is a commit, resulting in a json (000000.json) file. 10 commits become a checkpoint (0000010.checkpoint. parquet) file.

Is Transaction Log user friendly to understand?

Yes, understanding each commit json file is very straight forward. It contains the following metadata

  • commitInfo – commitInfo object has timestamp, type of operation, operation metrics, operation parameters, readversion and isBlindAppend properties
  • Series of actions
    • If the action is add/remove, then it has path, partitionValues (if any), size, modification time, data change and stats. Stats contains minimum and maximum values of all columns stored in a file.
    • If the action is update metadata/change protocol, the commitinfo is updated with datachange set to false on all files on the latest commit.

Are there ways to read and Interpret Transaction Log?

Yes, there are couple of ways to read and interpret transaction log files under _delta_log folder.

  • As each commit or series of commits (checkpoint) are json files, its easy-to-read json contents. I would not prefer this approach for two reasons
    • This one is obvious, Rebuilding metadata by reading json content for each commit since last checkpoint can be cumbersome. Note the change in transaction log metadata structure can lead to read failures (read about change protocol action).
    • Delta format adheres to optimistic concurrency. What happens if two or more users are reading the dataset while you are writing or vise verse?
  • Is there a better way to read delta format datasets? Yes, the delta standalone library can address the above issues adhering to change protocol and optimistic concurreny control.

What is Delta Standalone Library?

Delta Standalone is a single node java library that can be used to read from and write to delta datasets on file storage. This file storage can be ADLS Gen2, Windows/Linux file systems, S3 buckets or another file store that supports HDFS api’s. It provides APIs to interact on a dataset metadata in the transaction log, implementing the Delta Transaction Log Protocol to achieve the transactional guarantees of the Delta Lake format. The good part is that this library does not depend on apache Spark and has only a few transitive dependencies, therefore it can be used by any compute (Web Api’s, Azure Functions, Web Jobs with combination of MPP systems such as SQL databases/data warehouses) layer.

Where can I use this Library?

If you observe this library closely, you will notice that the power of this library is not to read the actual data (though you can read the data) but the metadata (transaction log a.k.a _delta_log). Now let’s define use cases where we can use this library?

  • Synapse Data Warehouse or any other databases on Azure stack can’t read datasets in delta format. Can we use this library to retrieve files injunction with MPP or database systems that have compute power to read parquet files?
  • Can your background services, micro services, HTAP services read datasets from ADLS G2 or any other storage instead of storage all the datasets in a sql layer by duplicating data for transactional and analytical needs?
  • Can various ELT services leverage this library as a metadata layer and skip the usage of Spark simply to read delta log?

How to use Delta Standalone Library?

This library is simple to use. You need to know about three classes to successfully implement delta reads of a dataset.

  • DeltaLog – is the interface/class to programmatically interact with the metadata in transaction log (under _delta_log folder) for a dataset. This class provides access to the snapshot class in context of reading a dataset
  • Snapshot & DeltaScan – snapshot represents the state of a dataset at a specific version. DeltaLog class also provides a way to read a version using getSnapshotForTimestampAsOf or getSnapshotForVersionAsOf. DeltaScan provides memory-optimized iterator over metadata files optionally by passing in a partition filtering predicate (partition pruning)
  • OptimisticTransaction – This is a main class to set the updates to the transaction log. During a transaction all reads must be done using OptimisticTransaction instead of DeltaLog in order to detect conflicts and concurrent updates.

Pre-requsites to setup the solution

  • Set the storage configuration to the storage where delta datasets are stored. Refer to below method where storage configuration is set to use ADLS Gen2 storage account. The following method uses application registration to connect to storage account with storage blob data contributor role. The application registration secret is stored in KeyVault and KeyVault credentials are stored in application.properties file.
    public Configuration setStorageConfiguration(String storageAccountName, String storageConnectionSecret) throws IOException {
    JSONObject json = new JSONObject(_kvProvider.getStorageConnectionSecret(storageConnectionSecret));
    Configuration conf = new Configuration();
    conf.set("fs.azure.account.auth.type."+storageAccountName+".dfs.core.windows.net", "OAuth");
    conf.set("fs.azure.account.oauth.provider.type."+storageAccountName+".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider");
    conf.set("fs.azure.account.oauth2.client.id."+storageAccountName+".dfs.core.windows.net", json.getString("clientId"));
    conf.set("fs.azure.account.oauth2.client.secret."+storageAccountName+".dfs.core.windows.net", json.getString("clientSecret"));
    conf.set("fs.azure.account.oauth2.client.endpoint."+storageAccountName+".dfs.core.windows.net", "https://login.microsoftonline.com/"+json.getString("tenantId")+"/oauth2/token");
    return conf;
    }
    • Client Id, Client Secret and Tenant Id values are stored as a secret in Key Vault. Store the secret in following format.
      {
        "clientId": "",
        "clientSecret": "",
        "tenantId": ""
      }
      
    • Key Vault credentials are stored in application.properties file within project structure. Add following properties to application.properties file. The below code will fetch the secret from KeyVault
      public String getStorageConnectionSecret(String storageConnectionSecret) throws IOException {
        azure.key-vault.clientId=  
        azure.key-vault.clientSecret=  
        azure.key-vault.endpoint=  
        azure.key-vault.tenantId= 
      

How to use Delta Standalone?

Now that you know the most important classes to read delta log and pre-requisites, let’s get right into an example. This delta standalone example is wrapped by a spring boot application with DatasetController class. DatasetController class has many request mappings. One of the request mappings is getDatasetfilesToRead method to get the delta files paths to read based on inputs and configuration provided.

  • Request Mapping - getDatasetfilesToRead

    @RequestMapping(value = "/getDatasetFilesToRead", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    public DatasetResponse getDatasetFilesToRead(@RequestBody DatasetConfig datasetConfig) {
    try {
    return _datasetService.ProcessDataset(datasetConfig);
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }

    • Initialize DeltaLog class to read the dataset from storage configuration and user input. The below line uses transtive hadoop dependency 'org.apache.hadoop.conf.Configuration' to use underlying log store (in this case, Azure Log Store) api to connect to storage account.
      DeltaLog log = DeltaLog.forTable(conf, datasetConfig.getDatasetPath());
    • Get the latest snapshot, schema of dataset and apply partition pruning rules
      // Get the latest snapshot of the dataset
      // Future versions will support asOfVersion And TimeStamp
      Snapshot latestSnapshot = log.update();
      // Get the dataset schema of the Latest snapshot
      StructType schema = latestSnapshot.getMetadata().getSchema();
      // Apply partition pruning on dataset by using the dataset rules provided by user
      // Partition pruning will filter the number of files based on how dataset is partitioned.
      DeltaScan scan = applyPartitionPruningOnDataset(latestSnapshot, schema, datasetConfig.datasetRules);
      private DeltaScan applyPartitionPruningOnDataset(Snapshot latestSnapshot, StructType schema, ArrayList<DatasetRule> rules) {
      // If no partitions rules are provided in the input, get the full dataset scan
      if (rules == null || rules.stream().noneMatch(r -> r.isPartitioned))
      return latestSnapshot.scan();
      else
      // Apply partition pruning on all columns provided in the input
      return latestSnapshot.scan(applyPartitionedColumnRules(schema, rules.stream().filter(r -> r.isPartitioned).collect(Collectors.toList())));
      }
    • Non partition columns data filtering (Residual Predicate) - TBD
      Above steps will provide the list of files for a given version of a dataset.
  • Request Mapping - getDatasetRecords (TBD)