Need to build non-ML data pipeline, is DVC good fit?

I need to build a data pipeline that basically goes like this:

  1. download new data hourly occasionally creating a new file in a directory
  2. extract data into usable form, create new file in output directory
  3. munch on all data for any day that any changed hourly data
  4. position changed output files into directory for on-line query service

As I see it, having an entire directory as an output for step 1, 2 and 3 and entire directories as dependencies for steps 2, 3, and 4 could work, but processing steps would need to examine input directories to see what has changed since they last ran. I would prefer to push that logic into the workflow manager.

An alternative would be to add dependencies or outputs each time a new file appears but that seems like just as much work as managing the workflow manually and leads to a kind of ugly DAG. At least I could intelligently version workflow steps this way, unlike option 3.

A third option would be be to create an entire dependency chain for each new file downloaded by step (1). This leads to a really furry processing DAG that will be hard to understand and nearly impossible to consistently change processing steps.

Alternative workflow engines like airflow and luigi and pachyderm might be alternatives if I am trying to force-fit DVC into a round hole, but I really like DVC’s general philosophy.

What is the best way to handle this kind of problem? Is DVC appropriate?

Hi,

Sorry, I didn’t understand that sentence. Not sure it’s key but feel free to clarify what that stage does and what’s it’s output.

dvc repro can do this for you though. DVC calculates the contents hash for all dependencies and only runs the stages that are needed downstream from the changes. Hash calculation can take time however, especially with large files and/or growing directories.

Not recommended. The pipeline execution probably shouldn’t redefine the pipeline’s definition (raises reproducibility questions). You can however use values from params.yaml as dvc.yaml variables and foreach sets to expand them into stages. That would be a combination of alternatives 2 and 3.
See dvc.yaml Files

Another question is when/if you’ll be committing changes to Git (marking a new project version).

Your application sounds like a production ETL flow (from my limited experience). We’re growing into the direction of DVC serving as a production-capable system to better cover these scenarios, but it was originally mostly concerned with defining, versioning, and reproducing data science development pipelines.

Ultimately it’s up to you to decide whether DVC is appropriate. Please let us know.

Hope that helps

In case it’s a helpful addition to what @jorgeorpinel already wrote, you could take a look at Incremental processing or streaming in micro-batches · Discussion #5917 · iterative/dvc · GitHub if you want to read discussion and feature requests around similar workflows. To summarize the current state of DVC, if you have a directory as a dependency, there isn’t really a way to have your stage run only on the newly added/changed data within that directory.

1 Like

Yes. Well. Ahem.

I think I can understand why you didn’t understand it. I wouldn’t understand it if I hadn’t written it.

What I mean is that a processing step should be able to detect when a new file appears in an input directory and only process that file (or files).

@dberenbaum addresses this (sadly, in the negative) in the next answer.

That is a very helpful link. Exactly what I was looking for.

Very frustrating. DVC is so very close to what I need.

My next thought would be whether there is enough change control to have an external entity see the changed file and augment the DAG. Or another option, I suppose is to write a simple library that remembers which files you have already processed … which is essentially implementation of micro-batching by hand.

I have been thinking about this and I think that DVC actually does contain nearly all of the mechanism needed even if it appears otherwise on the surface.

I think that the key problem is that I (and others) are thinking of the directory full of input files is the dependency. One way around that is to interpose an inventory step before and after the work step that I want to perform. The input directory is a dependency of the inventory step and an inventory of files is the output. This inventory file is the dependency of my work step.

Whenever the input directory changes, the inventory step would scan the entire directory and produce a list of old and new files (the inventory).

The update of this inventory would trigger my work step. I can use the inventory file to process only new files, but my work step would only have the output directory as an output. My changes to the output directory would likely trigger another inventory step.

The only extension would be to allow either the inventory or my processing step to maintain a state file that is checked out before my work step and checked back in after. This looks like circular dependency, but I would contend that it does not actually impair the correct functioning and reproducibility of the entire DAG if batch information is retained in the state file. Limiting the scope of the state file would further aid reasoning about this.

One big opportunity for optimization would be to memo-ize the combination of input + state + code hashes => output hash. This would mean that DAGs that replicate lots of function could be “run” very quickly because the framework could detect redundant processing and simply materialize the expected output from the cache. Backfill is also facilitated because we can simply revert an inventory file to a desired state and the framework will roll forward. If hashes are computed on each output file checkin then we can detect cases of unimportant updates. This means that if an input file batch produces the same work product, all downstream processing for that DAG can be avoided since there are no changes and the hashing of outputs on all replicated DAGs can also be avoided by memoization.

1 Like

Yes, helper stages that create summary or config files can be useful.

Not sure which one is the “work step” but keep in mind stage definitions cannot have overlapping outputs. Only the first step can specify the directory (where data files are added periodically) as its output.

Circular dependencies are also not supported :sweat_smile: (except as experiment checkpoints) precisely — as you suggest — because they would prevent DVC to reconstruct a finite DAG to reproduce the pipeline (the chain of outs and deps is the only stage order that matters to DVC).

DVC does this! :slightly_smiling_face: It’s called the run-cache. It can even be pushed/pulled to/from remote storage.

Please let us know if you try any of these strategies and have any learnings to share :grin:

Noted. Thanks.

The idea is that I would have two directories we can call IN/ and OUT/. The execution would proceed as

  1. inventory step to produce a list of files in an inventory file
  2. work step with inventory file as dependency. Read new files in IN/ and produce new files in OUT/
  3. next inventory step to produce a list of files in a second inventory file

As you can see, there would be no overlapping outputs.

For the purposes of inducing a topological ordering on process steps, this would not be a circular dependency. Such a state file is only viewed or updated by a single step and does not change the order that work should be done.

Moreover, since the state file is versioned, the work step is idempotent as well. Given dependencies and state file from a particular version, the processing step will produce identical next state and outputs.

Ah… that is good news. (I have a new reading assignment to learn how the run-cache is indexed).

Well, actually, I have done this exact strategy before and the results were really excellent because it gave me the file level incrementality that I needed most but it gave me the ability to reason about the state of all of my data. What I was missing back then was the version control and run-cache which are both primary features of DVC. I think that the combination of these ideas does not compromise the core ideas of DVC, but extends them to an important use case.

The reason that I think that this is an important use case for machine learning is that it is very common that I don’t have a static, academic sort of machine learning problem whereby I iterate on the learning parameters and features by monitoring the leaderboard. What I have always had in industrial settings was a continuously growing set of training data and unlabeled data. I needed to evaluate old models on newly labelled data and build new models that I would evaluate on new and old versions of the data. Scoring unlabeled data would drive new labeling efforts by stratified sampling.

As such, extracting and extending transaction histories from log files and scoring and training models from the new data were critical capabilities. There was no bright line between “data engineering” and “real data science”. There was also no clear line between data engineering for analytics versus data engineering for monitoring and BI.

So, from my own experience of a quarter century of building industrial machine learning systems, incremental processing falls squarely into what I would see as DVC’s mission. The folks building DVC might differ, but there isn’t much of a way for me to use the DVC otherwise since my entire (data) world is incremental.

1 Like