Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse all later tasks to keep the DRY principle? #40

Open
DOH-Manada opened this issue Aug 22, 2021 · 0 comments
Open

Reuse all later tasks to keep the DRY principle? #40

DOH-Manada opened this issue Aug 22, 2021 · 0 comments

Comments

@DOH-Manada
Copy link

I have a few tasks of the following nature, which are pretty standard, i.e., Import processed dataset, setup input data, split, then pass to a model.

# _tasks.py

import d6tflow

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
# loads a processed dataframe (probably pickled)

@d6tflow.requires(TaskLoadDataframe)
class TaskSetupExogEndogData(d6tflow.tasks.TaskCache):
# do stuff. Saves data and labels

@d6tflow.requires({'inputs' : TaskSetupExogEndogData, })
class TaskSplitData(d6tflow.tasks.TaskCache):
# do more stuff. Splits data and labels and saves to dictionary
# _tasks_sklearn.py

import _tasks

import d6tflow
from sklearn import svm

@d6tflow.requires(_tasks.TaskSplitData)
class TaskTrainSklearnSVM(d6tflow.tasks.TaskCache):
    kernel = d6tflow.Parameter(default = 'linear')
    
    def run(self):
        data = self.inputLoad()
        model_svm = svm.SVR(kernel = self.kernel)
        model_svm.fit(data['train_data'], data['train_labels'])
        model_svm.score(data['valid_data'], data['valid_labels'])
        # TODO: self.save model artifacts

Context: I would obviously want to reuse this as much as possible.

Question 1: Is it possible to create several independent tasks for processing dataset that I can set as the "initial task" of this workflow?
Question 2: If yes, wow would I call that as a dynamic requirement in TaskLoadDataframe?

image

Solution A:
It seems the best way to handle this within the scope of this package is to not create a TaskA and just do the following:

  1. Preprocess the dataframe
  2. Export to a pickle (or csv)
  3. Read the path to the exported file in as a parameter to the TaskLoadDataframe so I could run the WorkFlow, and continue on.

Solution B:
I know Luigi doesn't allow for passing dataframes as parameters, but could I call a dataframe in the run of a task as a means of reducing/completely removing the fileio in step 2?

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):

    def run(self, dataframe):
        self.save(dataframe)

I don't think the source code allows for this, what would the syntax to run that as a workfow?

Solution B (reprise): I also could alternatively save the processed dataframe in a dictionary and pass it into TaskLoadDataframe as a d6tflow-defined parameter.

Thoughts? Great work on this by the way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant