# Data Factory and Flows Design - Oct 2017 to Apr 2018 Date: 2018-04-08 > [!note] >This is a miscellaneous content from various HackMD docs. I'm preserving because either a) there is material to reuse here that I'm not sure is elsewhere b) there were various ideas in here we used later (and it's useful to see their origins). > >Key content: > >* March-April 2018: first planning of what became dataflows (had various names including dataos). A lot of my initial ideas ended up in this micro-demo https://github.com/datopian/dataflow-demo. This evolved with Adam into https://github.com/datahq/dataflows >* Autumn 2017: planning of Data Factory which was the data processing system inside DataHub.io. This was more extensive than dataflows (e.g. it included a runner, an assembler etc) and was based original data-package-pipelines and its runner. Issues with that system was part of the motivation for starting work on dataflows. > >~Rufus May 2020 ## Plan April 2018 * tutorial (what we want our first post to look like) * And then implement minimum for that * programmatic use of pipelines and processors in DPP * processor abstraction defined ... * DataResource and DataPackage object that looks like [Frictionless Lib pattern][frictionless-lib] * processors library split out * code runner you can call dataos.run.runSyncPipeline * dataflow init => python and yaml * @adam Write up Data Factory architecture and naming as it currently stands [2h] [frictionless-lib]: http://okfnlabs.org/blog/2018/02/15/design-pattern-for-a-core-data-library.html ## 8 April 2018 Lots of note on DataFlow which are now moved and refactored into https://github.com/datahq/dataflow-demo The Domain Model of Factory * Staging area * Planner * Runner * Flow * Pipelines * Processors ```mermaid graph TD flow[Flow] pipeline[Pipelines] processor[Processors] flow --> pipeline pipeline --> processor ``` Assembler ... ```mermaid graph LR source[Source from User
source.yaml] --assembler--> plan[Execution Plan
DAG of pipelines] plan --> pipeline[Pipeline Runner
Optimizer/Dependency management] ``` => Assembler generates a DAG. - dest filenames in advance ... - for each pipeline: pipelines it dpeends on - e.g. sqlite: depends on on all derived csv pipelines running - node depends: all csv, all json pipelines running - zip: depends on all csv running - Pipelines ```mermaid graph LR source[Source Spec Parse
validate?] --> planner[Planner] planner --> workplanb(Plan of Work

DAG of pipelines) subgraph Planner planner subplan1[Sub Planner 1 e.g. SQLite] end ``` ## 5 Oct 2017 Notes: * Adam: finding more and more bugs (edge cases) and then applying fixes but then more issues * => Internal data model of pipelines was wrong ... * Original data model has a store: with one element the pipeline + a state (its idle, invalid, waiting to be executed, running, dirty) * Problem starts: you have a very long pipeline ... * something changes and pipeline gets re-added to the queue. then you have same pipeline in queue in two different states. Should not be a state of the pipeline but state of the execution of the pipeline. * Split model: pipeline (with their hash) + "runs" ordered by time of request Questions: * Tests in assembler ... ### Domain Model ```mermaid graph TD flow[Flow] pipeline[Pipelines] processor[Processors] flow --> pipeline pipeline --> processor ``` * Pipelines have no branches they are always linear * Input: nothing or a file or a datapackage (source is stream or nothing) * Output: datapackage - usually dumped to something (could be stream) * Pipelines are a list of processors **and** their inputs * A Flow is a DAG of pipelines * In our case: one flow produces a "Dataset" at a given "commit/run" * Source Spec + DataPackage[.json] => (via assembler) => Flow Spec * Runner * Pipelines runner: a set of DAG of pipelines (where each pipeline is schedule to run once all dependencies have been run) * Events => lead to new flows or pipelines being created ... (or existing ones being stopped or destroyed) ```mermaid graph LR subgraph flow2 x --> z y --> z end subgraph flow1 a --> c b --> c end ``` State of Factory(flows, state) f(event, state) => state flow dependencies? Desired properties * We economise on runs: we don't rerun processors (pipelines?) that have same config and input data * => one reason for breaking down into smaller "operators" is that we economise here ... * Simplicity: the system is understandable ... * Processors (pipelines) are atomic - they get their configuration and run ... * We can generate from a source spec and an original datapackage.json a full set of pipelines / processors. * Pipelines as Templates vs Pipelines as instances ... pipeline id = hash(pipeline spec, datapackage.json) {'{pipelineid}'}/... next pipeline can rely on {'{pipelineid}'} Planner ... => a pipeline is never rerun (once it is run) | | Factory | Airflow | |--------------|----------------------------|-----------| | DAG | Implicit (no concept) | DAG | | Node | Pipelines (or processors?) | Operators | | Running Node | ? Running pipelines | Tasks | | Comments | | | https://airflow.incubator.apache.org/concepts.html#workflows # Analysis from work for Preview As a Publisher I want to upload a 50Mb CSV so that the showcase page works - it does not crash by browser (because it is trying to load and display 50Mb of CSV) *plus* As a Publisher I want to customize whether I generate a preview for a file or not so that I don't get inappropriate previews > As a Publisher I want to have an SQLite version of my data auto-built > > As a Publisher I want to upload an Excel file and have csv versions of each sheet and an sqlite version ### Overview *This is what we want* ```mermaid graph LR subgraph Source file[vix-daily.csv] view[timeseries-1
Uses vix-daily] end subgraph "Generated Resources" gcsv[derived/vix-daily.csv] gjson[derived/vix-daily.json] gjsonpre[derived/vix-daily-10k.json] gjsonpre2[derived/view-time-series-1.json] end subgraph "Generated Views" preview[Table Preview] end file --rule--> gcsv file --rule--> gjson file --rule--> preview view --> gjsonpre2 preview --> gjsonpre ``` ### How does this work? #### Simple example: no previews, single CSV in source ```mermaid graph LR load[Load Data Package
Parse datapackage.json] parse[Parse source data] dump((S3)) load --> parse parse --> dcsv parse --> djson parse --> dumpdp dcsv --> dump djson --> dump dsqlite --> dump dnode --> dump dumpdp --> dump dcsv --> dnode dcsv --> dsqlite subgraph "Dumpers 1" dcsv[Derived CSV] djson[Derived JSON] end subgraph "Dumpers 2 - after Dumper 1" dsqlite[SQLite] dnode[Node] end dumpdp[Derived DataPackage.json

Assembler gives it the DAG info
Runs after everything
as needs size,md5 etc] ``` ```yaml= meta: owner: ownerid: dataset: version: 1 findability: inputs: - # only one input is supported atm kind: datapackage url: parameters: resource-mapping: : outputs: - ... see https://github.com/datahq/pm/issues/17 ``` #### With previews ```mermaid graph LR load[Load Data Package
Parse datapackage.json] parse[Parse source data] dump((S3)) load --> parse parse --> dcsv parse --> djson parse --> dumpdp dcsv --> dump djson --> dump dsqlite --> dump dnode --> dump dumpdp --> dump dcsv --> dnode dcsv --> dsqlite parse --> viewgen viewgen --> previewgen previewgen --view-10k.json--> dump subgraph "Dumpers 1" dcsv[Derived CSV] djson[Derived JSON] end subgraph "Dumpers 2 - after Dumper 1" dsqlite[SQLite] dnode[Node] viewgen[Preview View Gen
Adds preview views] previewgen[View Resource Generator] end dumpdp[Derived DataPackage.json

Assembler gives it the DAG info
Runs after everything
as needs size,md5 etc] ``` ### With Excel (multiple sheets) Source is vix-daily.xls (with 2 sheets) ```mermaid graph LR load[Load Data Package
Parse datapackage.json] parse[Parse source data] dump((S3)) dumpdp[Derived DataPackage.json

Assembler gives it the DAG info
Runs after everything
as needs size,md5 etc] load --> parse parse --> d1csv parse --> d1json parse --> d2csv parse --> d2json parse --> dumpdp d1csv --> dump d1json --> dump d2csv --> dump d2json --> dump dsqlite --> dump dnode --> dump dumpdp --> dump d1csv --> dnode d1csv --> dsqlite d2csv --> dnode d2csv --> dsqlite d1csv --> view1gen d2csv --> view2gen view1gen --> preview1gen view2gen --> preview2gen preview1gen --view1-10k.json--> dump preview2gen --view2-10k.json--> dump subgraph "Dumpers 1 sheet 1" d1csv[Derived CSV] d1json[Derived JSON] end subgraph "Dumpers 1 sheet 2" d2csv[Derived CSV] d2json[Derived JSON] end subgraph "Dumpers 2 - after Dumper 1" dsqlite[SQLite] dnode[Node] view1gen[Preview View Gen
Adds preview views] preview1gen[View Resource Generator] view2gen[Preview View Gen
Adds preview views] preview2gen[View Resource Generator] end ``` datapackage.json ```javascript= { resources: [ { "name": "mydata" "path": "mydata.xls" } ] } ``` ```yaml= meta: owner: ownerid: dataset: version: 1 findability: inputs: - # only one input is supported atm kind: datapackage url: parameters: resource-mapping: : processing: - input: # mydata output: # mydata_sheet1 tabulator: sheet: 1 - input: # mydata output: # mydata_sheet2 tabulator: sheet: 2 ``` ```yaml= meta: owner: ownerid: dataset: version: 1 findability: inputs: - # only one input is supported atm kind: datapackage url: parameters: resource-mapping: : // excel file => (implictly and in cli becomes ...) ... processing: - input: # mydata output: # mydata-sheet1 tabulator: sheet: 1 ``` Result ```javascript= { resources: [ { "name": "mydata", "path": "mydata.xls" }, { "path": "derived/mydata.xls.sheet1.csv" "datahub": { "derivedFrom": "mydata" } }, { "path": "derived/mydata.xls.sheet2.csv" "datahub": { "derivedFrom": "mydata" } } ] } ``` ### Overall component design Assembler ... ```mermaid graph LR source[Source from User
source.yaml] --assembler--> plan[Execution Plan
DAG of pipelines] plan --> pipeline[Pipeline Runner
Optimizer/Dependency management] ``` => Assembler generates a DAG. - dest filenames in advance ... - for each pipeline: pipelines it dpeends on - e.g. sqlite: depends on on all derived csv pipelines running - node depends: all csv, all json pipelines running - zip: depends on all csv running - Pipelines ```mermaid graph LR source[Source Spec Parse
validate?] --> planner[Planner] planner --> workplanb(Plan of Work

DAG of pipelines) subgraph Planner planner subplan1[Sub Planner 1 e.g. SQLite] end ``` ### NTS ``` function(sourceSpec) => (pipelines, DAG) pipeline pipeline-id steps = n * processor parameters schedule dependencies ```