[#796,site,docs][xl]: copy /docs/dms to portaljs
This commit is contained in:
BIN
site/content/docs/dms/flows/airtunnel.png
Normal file
BIN
site/content/docs/dms/flows/airtunnel.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 216 KiB |
225
site/content/docs/dms/flows/design.md
Normal file
225
site/content/docs/dms/flows/design.md
Normal file
@@ -0,0 +1,225 @@
|
||||
# Data Factory Design
|
||||
|
||||
Our Data Factory system is called AirCan. A Data Factory is a set of services/components to process and integrate data (coming from different sources). Plus patterns / methods for integrating with CKAN and the DataHub.
|
||||
|
||||
## Components
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
subgraph Orchestration
|
||||
airflow[AirFlow]
|
||||
airflowservice[AirFlow service]
|
||||
end
|
||||
|
||||
subgraph CKAN integration
|
||||
ckanhooks[CKAN extension to trigger and report on factory activity]
|
||||
ckanapi[API for triggering DAGs etc]
|
||||
ckanui[UI integration - display info on ]
|
||||
end
|
||||
|
||||
subgraph Processors and Flows
|
||||
ckandatastoreload[CKAN Loader lib]
|
||||
ckanharveters[CKAN Harvesters]
|
||||
validation[Validation Lib]
|
||||
end
|
||||
```
|
||||
|
||||
## DataStore Load job story
|
||||
|
||||
### Reporting Integration
|
||||
|
||||
When I upload a file to CKAN and it is getting loaded to the datastore (automatically), I want to know if that succeeded or failed so that I can share with my users that the new data is available (or do something about the error).
|
||||
|
||||
For a remote Airflow instance (let's say on Google Composer), describe the DAG tasks and the process. i.e.
|
||||
|
||||
* File upload on CKAN triggers the ckanext-aircan connector
|
||||
* which makes API request to airflow on GCP and triggers a DAG with following parameters
|
||||
* A f11s resource orject including
|
||||
* the remote location of the CSV file and the reource ID
|
||||
* The target resource id
|
||||
* An API key to use when loading to CKAN datastore
|
||||
* [A callback url]
|
||||
* The DAG
|
||||
* deletes the datatore table
|
||||
* if it exists, creates a new datastore table
|
||||
* loads CSV from the specified location (inforation available on DAG parameters)
|
||||
* converts the CSV to JSON. The output of the converted JSON file will be in a bucket on GCP.
|
||||
* upserts the JSON data row by row into the CKAN DataStore via CKAN's DataStore API
|
||||
* This is what we have now: {'invoke{"message":"Created <DagRun ckan_api_load_gcp @ 2020-07-14 13:04:43+00:00: manual__2020-07-14T13:04:43+00:00, externally triggered: True>"}'} `/api/3/action/datastore_create` and passing the contents of the json file
|
||||
* OR using upsert with inserts (faster) NB: datapusher just pushes the whole thing into `datastore_create` so stick with that.
|
||||
* OR: if we are doing postgres copy we need direct access to postgres DB
|
||||
* ... [tbd] notifies CKAN instance of this (?)
|
||||
|
||||
Error Handling and other topics to consider
|
||||
|
||||
* How can we let CKAN know something went wrong? Shall we create a way to notify a certain endpoint on ckannext-aircan connector?
|
||||
* Shall we also implement a timeout on CKAN?
|
||||
* What are we going to display in case of an error?
|
||||
* The "tmp" bucket on GCP will eventually get full of files; shall we flush it? How do we know when it's safe to delete a file?
|
||||
* Lots of ways up this mountain.
|
||||
* What do we do for large files?
|
||||
|
||||
## AirCan API
|
||||
|
||||
AirCan is built on AirFlow so we have same basic API TODO: insert link
|
||||
|
||||
However, we have standard message formats to pass to DAGs following these principles: All dataset and data resource objects should following the Frictionless specs
|
||||
|
||||
Pseudo-code showing how we call the API:
|
||||
|
||||
```python=
|
||||
airflow.dag_run({
|
||||
"conf": {
|
||||
"resource": json.dumps({ # f11s resource object
|
||||
resource_id: ...
|
||||
path: ...
|
||||
schema: ...
|
||||
})
|
||||
"ckan_api_key: ...
|
||||
"ckan_api_endpoint": demo.ckan.org/api/
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
See for latest, up to date version: https://github.com/datopian/ckanext-aircan/blob/master/ckanext/aircan_connector/action.py#L68
|
||||
|
||||
## CKAN integration API
|
||||
|
||||
There is a new API as follows:
|
||||
|
||||
`http://ckan:5000/api/3/action/aircan_submit?dag_id=...&dataset=...&resource`
|
||||
|
||||
Also DAGs can get triggered on events ... TODO: go look at Github actions and learn from it ...
|
||||
|
||||
## Architecture
|
||||
|
||||
Other principles of architecture:
|
||||
|
||||
* AirFlow tasks and DAGs should do very little themselves and should hand off to separate libraries. Why? To have better separation of concerns and **testability**. AirCan is reasonably cumbersome to test but an SDK is much more testable.
|
||||
* Thus AirFlow tasks are often just going to pass through arguments TODO: expand this with an example ...
|
||||
* AirFlow DAG will have incoming data and config set in "global" config for the DAG and so available to every task ...
|
||||
* Tasks should be as decoupled as possible. Obviously there *is* some data and metadata passing between tasks and that should be done by writing those to a storage bucket. Metadata MUST be stored in f11s format.
|
||||
* See this interesting blog post (not scientific) about why the previous approach, with side effcts, is not very resilient in the long run of a project https://medium.com/@maximebeauchemin/functional-data-engineering-a-modern-paradigm-for-batch-data-processing-2327ec32c42a
|
||||
* don't pass data explicitly between tasks (rather it is passed implicitly via an expectation of where the data is stored ...)
|
||||
* tasks and flows should be re-runnable ... (no side effects principle)
|
||||
|
||||
Each task can write to this location:
|
||||
|
||||
```
|
||||
bucket/dagid/runid/taskid/resource.json
|
||||
bucket/dagid/runid/taskid/dataset.json
|
||||
bucket/dagid/runid/taskid/... # data files
|
||||
```
|
||||
|
||||
|
||||
## UI in DMS
|
||||
|
||||
URL structure on a daaset
|
||||
|
||||
```
|
||||
# xxx is a dataset
|
||||
/@myorg/xxx/actions/
|
||||
/@myorg/xxx/actions/runs/{id}
|
||||
```
|
||||
|
||||
Main question: to display to user we need some way to log what jobs are associated with what datasets (and users) and perhaps their status
|
||||
|
||||
* we want to keep factory relatively dumb (it does not know about datasets etc etc)
|
||||
* in terms of capabilities we need a way to pass permissions into the data factory (you hand over the keys to your car)
|
||||
|
||||
Simplest approach:
|
||||
|
||||
* MetaStore (CKAN metadata db) has Jobs table which have structure of `| id | factory_id | job_type | created | updated | dataset | resource | user | status | info |` (where info is json blob)
|
||||
* status = one of `WAITING | RUNNING | DONE | FAILED | CANCELLED`. If failed we should have stuff in info about that.
|
||||
* `job_type` = one of `HARVEST | LOAD | VALIDATE ...` it is there so we could have several different factory jobs in one db
|
||||
* `info`: likely stuff
|
||||
* run time
|
||||
* error information (on failure)
|
||||
* success information: what was outcome, where are outputs if any etc
|
||||
* On creating a job in the factory, the factory returns a factory id. the metastore stores the factory id into a new job object along with dataset and user info ...
|
||||
* Qu: why have id and factory_id separate? is there any situation where you have a job w/o a factory id?
|
||||
* Then on loading a job page in frontend you can poll the factory for info and status (if status is WAITING or RUNNING)
|
||||
* => do we need the `info` column on the job (it's just a cache of this info)?
|
||||
* Ans: useful for jobs which are complete so we don't keep polling the factory (esp if factory deletes stuff)
|
||||
* Can list all jobs for a given dataset (or resource) with info about them
|
||||
|
||||
Qus:
|
||||
|
||||
* For Data Factory what do I do with Runs that are stale etc - how do I know who they are associated with. Can I store metadata on my Runs like who requested it etc.
|
||||
|
||||
### UI Design
|
||||
|
||||
Example from Github:
|
||||
|
||||

|
||||
|
||||
## Appendix
|
||||
|
||||
### Notes re AirCan API
|
||||
|
||||
https://medium.com/@ptariche/interact-with-apache-airflows-experimental-api-3eba195f2947
|
||||
|
||||
```
|
||||
{"message":"Created <DagRun ckan_api_load_gcp @ 2020-07-14 13:04:43+00:00: manual__2020-07-14T13:04:43+00:00, externally triggered: True>"}
|
||||
|
||||
GET /api/experimental/dags/<string:dag_id>/dag_runs/<string:execution_date>
|
||||
|
||||
GET /api/experimental/dags/ckan_api_load_gcp/dag_runs/2020-07-14 13:04:43+00:00
|
||||
|
||||
https://b011229e45c662be6p-tp.appspot.com/api/experimental/dags/ckan_api_load_gcp/dag_runs/2020-07-14T13:04:43+00:00
|
||||
|
||||
Resp: `{"state":"failed"}`
|
||||
```
|
||||
|
||||
### Google Cloud Composer
|
||||
|
||||
Google Cloud Composer is a hosted version of AirFlow on Google Cloud.
|
||||
|
||||
#### How Google Cloud Composer differs from local AirFlow
|
||||
|
||||
* File handling: On GCP, all the file handling must become interaction with a bucket ~rufus: what about from a url online (but not a bucket)
|
||||
Specifying the csv resource location (on a local Airflow) must become sending a resource to a bucket (or just parsing it from the JSON body). When converting it to a JSON file, it must become an action of creating a file on a bucket.
|
||||
* Authentication: TODO
|
||||
|
||||
### AirFlow Best Practices
|
||||
|
||||
* Should you and how do you pass information between tasks?
|
||||
* https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f
|
||||
* https://towardsdatascience.com/airflow-sharing-data-between-tasks-7bbaa27eeb1
|
||||
|
||||
### What terminology should we use?
|
||||
|
||||
ANS: we use AirFlow terminology:
|
||||
|
||||
* Task
|
||||
* DAG
|
||||
* DagRun
|
||||
|
||||
For internals what are the options?
|
||||
|
||||
* Task or Processor or ...
|
||||
* DAG or Flow or Pipeline?
|
||||
|
||||
TODO: table summarizing options in AirFlow, Luigi, Apache Beam etc.
|
||||
|
||||
#### UI Terminology
|
||||
|
||||
* Actions
|
||||
* Workflows
|
||||
|
||||
Terminology options
|
||||
|
||||
* Gitlab
|
||||
* Pipelines: you have
|
||||
* Jobs (runs of those
|
||||
* Schedules
|
||||
* Github
|
||||
* Workflows
|
||||
* Runs
|
||||
* (Schedules - not explicit)
|
||||
* Airflow
|
||||
* DAGs
|
||||
* Tasks
|
||||
* DAG Runs
|
||||
|
||||
517
site/content/docs/dms/flows/history.md
Normal file
517
site/content/docs/dms/flows/history.md
Normal file
@@ -0,0 +1,517 @@
|
||||
# Data Factory and Flows Design - Oct 2017 to Apr 2018
|
||||
|
||||
Date: 2018-04-08
|
||||
|
||||
> [!note]
|
||||
>This is a miscellaneous content from various HackMD docs. I'm preserving because either a) there is material to reuse here that I'm not sure is elsewhere b) there were various ideas in here we used later (and it's useful to see their origins).
|
||||
>
|
||||
>Key content:
|
||||
>
|
||||
>* March-April 2018: first planning of what became dataflows (had various names including dataos). A lot of my initial ideas ended up in this micro-demo https://github.com/datopian/dataflow-demo. This evolved with Adam into https://github.com/datahq/dataflows
|
||||
>* Autumn 2017: planning of Data Factory which was the data processing system inside DataHub.io. This was more extensive than dataflows (e.g. it included a runner, an assembler etc) and was based original data-package-pipelines and its runner. Issues with that system was part of the motivation for starting work on dataflows.
|
||||
>
|
||||
>~Rufus May 2020
|
||||
|
||||
|
||||
|
||||
## Plan April 2018
|
||||
|
||||
* tutorial (what we want our first post to look like)
|
||||
* And then implement minimum for that
|
||||
* programmatic use of pipelines and processors in DPP
|
||||
* processor abstraction defined ...
|
||||
* DataResource and DataPackage object that looks like [Frictionless Lib pattern][frictionless-lib]
|
||||
* processors library split out
|
||||
* code runner you can call dataos.run.runSyncPipeline
|
||||
* dataflow init => python and yaml
|
||||
* @adam Write up Data Factory architecture and naming as it currently stands [2h]
|
||||
|
||||
[frictionless-lib]: http://okfnlabs.org/blog/2018/02/15/design-pattern-for-a-core-data-library.html
|
||||
|
||||
## 8 April 2018
|
||||
|
||||
Lots of note on DataFlow which are now moved and refactored into https://github.com/datahq/dataflow-demo
|
||||
|
||||
The Domain Model of Factory
|
||||
|
||||
* Staging area
|
||||
* Planner
|
||||
* Runner
|
||||
* Flow
|
||||
* Pipelines
|
||||
* Processors
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
|
||||
flow[Flow]
|
||||
pipeline[Pipelines]
|
||||
processor[Processors]
|
||||
|
||||
flow --> pipeline
|
||||
pipeline --> processor
|
||||
```
|
||||
|
||||
Assembler ...
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
source[Source from User<br/>source.yaml] --assembler--> plan[Execution Plan<br/>DAG of pipelines]
|
||||
plan --> pipeline[Pipeline Runner<br/>Optimizer/Dependency management]
|
||||
```
|
||||
|
||||
=> Assembler generates a DAG.
|
||||
|
||||
- dest filenames in advance ...
|
||||
- for each pipeline: pipelines it dpeends on
|
||||
- e.g. sqlite: depends on on all derived csv pipelines running
|
||||
- node depends: all csv, all json pipelines running
|
||||
- zip: depends on all csv running
|
||||
- Pipelines
|
||||
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
source[Source Spec Parse<br/>validate?] --> planner[Planner]
|
||||
planner --> workplanb(Plan of Work<br/><br/>DAG of pipelines)
|
||||
|
||||
subgraph Planner
|
||||
planner
|
||||
subplan1[Sub Planner 1 e.g. SQLite]
|
||||
end
|
||||
```
|
||||
|
||||
|
||||
## 5 Oct 2017
|
||||
|
||||
Notes:
|
||||
|
||||
* Adam: finding more and more bugs (edge cases) and then applying fixes but then more issues
|
||||
* => Internal data model of pipelines was wrong ...
|
||||
* Original data model has a store: with one element the pipeline + a state (its idle, invalid, waiting to be executed, running, dirty)
|
||||
* Problem starts: you have a very long pipeline ...
|
||||
* something changes and pipeline gets re-added to the queue. then you have same pipeline in queue in two different states. Should not be a state of the pipeline but state of the execution of the pipeline.
|
||||
* Split model: pipeline (with their hash) + "runs" ordered by time of request
|
||||
|
||||
Questions:
|
||||
|
||||
* Tests in assembler ...
|
||||
|
||||
### Domain Model
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
|
||||
flow[Flow]
|
||||
pipeline[Pipelines]
|
||||
processor[Processors]
|
||||
|
||||
flow --> pipeline
|
||||
pipeline --> processor
|
||||
```
|
||||
|
||||
* Pipelines have no branches they are always linear
|
||||
* Input: nothing or a file or a datapackage (source is stream or nothing)
|
||||
* Output: datapackage - usually dumped to something (could be stream)
|
||||
* Pipelines are a list of processors **and** their inputs
|
||||
* A Flow is a DAG of pipelines
|
||||
* In our case: one flow produces a "Dataset" at a given "commit/run"
|
||||
* Source Spec + DataPackage[.json] => (via assembler) => Flow Spec
|
||||
* Runner
|
||||
* Pipelines runner: a set of DAG of pipelines (where each pipeline is schedule to run once all dependencies have been run)
|
||||
* Events => lead to new flows or pipelines being created ... (or existing ones being stopped or destroyed)
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
subgraph flow2
|
||||
x --> z
|
||||
y --> z
|
||||
end
|
||||
|
||||
subgraph flow1
|
||||
a --> c
|
||||
b --> c
|
||||
end
|
||||
```
|
||||
|
||||
State of Factory(flows, state)
|
||||
|
||||
f(event, state) => state
|
||||
|
||||
flow dependencies?
|
||||
|
||||
Desired properties
|
||||
|
||||
* We economise on runs: we don't rerun processors (pipelines?) that have same config and input data
|
||||
* => one reason for breaking down into smaller "operators" is that we economise here ...
|
||||
* Simplicity: the system is understandable ...
|
||||
* Processors (pipelines) are atomic - they get their configuration and run ...
|
||||
* We can generate from a source spec and an original datapackage.json a full set of pipelines / processors.
|
||||
* Pipelines as Templates vs Pipelines as instances ...
|
||||
|
||||
pipeline id = hash(pipeline spec, datapackage.json)
|
||||
|
||||
{'{pipelineid}'}/...
|
||||
|
||||
next pipeline can rely on {'{pipelineid}'}
|
||||
|
||||
Planner ...
|
||||
|
||||
=> a pipeline is never rerun (once it is run)
|
||||
|
||||
| | Factory | Airflow |
|
||||
|--------------|----------------------------|-----------|
|
||||
| DAG | Implicit (no concept) | DAG |
|
||||
| Node | Pipelines (or processors?) | Operators |
|
||||
| Running Node | ? Running pipelines | Tasks |
|
||||
| Comments | | |
|
||||
|
||||
https://airflow.incubator.apache.org/concepts.html#workflows
|
||||
|
||||
# Analysis from work for Preview
|
||||
|
||||
As a Publisher I want to upload a 50Mb CSV so that the showcase page works - it does not crash by browser (because it is trying to load and display 50Mb of CSV)
|
||||
|
||||
*plus*
|
||||
|
||||
As a Publisher I want to customize whether I generate a preview for a file or not so that I don't get inappropriate previews
|
||||
|
||||
> As a Publisher I want to have an SQLite version of my data auto-built
|
||||
>
|
||||
> As a Publisher I want to upload an Excel file and have csv versions of each sheet and an sqlite version
|
||||
|
||||
### Overview
|
||||
|
||||
*This is what we want*
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
subgraph Source
|
||||
file[vix-daily.csv]
|
||||
view[timeseries-1<br/>Uses vix-daily]
|
||||
end
|
||||
|
||||
subgraph "Generated Resources"
|
||||
gcsv[derived/vix-daily.csv]
|
||||
gjson[derived/vix-daily.json]
|
||||
gjsonpre[derived/vix-daily-10k.json]
|
||||
gjsonpre2[derived/view-time-series-1.json]
|
||||
end
|
||||
|
||||
subgraph "Generated Views"
|
||||
preview[Table Preview]
|
||||
end
|
||||
|
||||
file --rule--> gcsv
|
||||
file --rule--> gjson
|
||||
file --rule--> preview
|
||||
|
||||
view --> gjsonpre2
|
||||
preview --> gjsonpre
|
||||
```
|
||||
|
||||
### How does this work?
|
||||
|
||||
#### Simple example: no previews, single CSV in source
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
load[Load Data Package<br/>Parse datapackage.json]
|
||||
parse[Parse source data]
|
||||
dump((S3))
|
||||
|
||||
load --> parse
|
||||
parse --> dcsv
|
||||
parse --> djson
|
||||
parse --> dumpdp
|
||||
dcsv --> dump
|
||||
djson --> dump
|
||||
dsqlite --> dump
|
||||
dnode --> dump
|
||||
dumpdp --> dump
|
||||
|
||||
dcsv --> dnode
|
||||
dcsv --> dsqlite
|
||||
|
||||
subgraph "Dumpers 1"
|
||||
dcsv[Derived CSV]
|
||||
djson[Derived JSON]
|
||||
end
|
||||
|
||||
subgraph "Dumpers 2 - after Dumper 1"
|
||||
dsqlite[SQLite]
|
||||
dnode[Node]
|
||||
end
|
||||
|
||||
dumpdp[Derived DataPackage.json<br/><br/>Assembler gives it the DAG info<br/>Runs after everything<br/>as needs size,md5 etc]
|
||||
```
|
||||
|
||||
```yaml=
|
||||
meta:
|
||||
owner: <owner username>
|
||||
ownerid: <owner unique id>
|
||||
dataset: <dataset name>
|
||||
version: 1
|
||||
findability: <published/unlisted/private>
|
||||
inputs:
|
||||
- # only one input is supported atm
|
||||
kind: datapackage
|
||||
url: <datapackage-url>
|
||||
parameters:
|
||||
resource-mapping:
|
||||
<resource-name-or-path>: <resource-url>
|
||||
|
||||
outputs:
|
||||
- ... see https://github.com/datahq/pm/issues/17
|
||||
```
|
||||
|
||||
#### With previews
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
load[Load Data Package<br/>Parse datapackage.json]
|
||||
parse[Parse source data]
|
||||
dump((S3))
|
||||
|
||||
load --> parse
|
||||
parse --> dcsv
|
||||
parse --> djson
|
||||
parse --> dumpdp
|
||||
dcsv --> dump
|
||||
djson --> dump
|
||||
dsqlite --> dump
|
||||
dnode --> dump
|
||||
dumpdp --> dump
|
||||
|
||||
dcsv --> dnode
|
||||
dcsv --> dsqlite
|
||||
|
||||
parse --> viewgen
|
||||
viewgen --> previewgen
|
||||
previewgen --view-10k.json--> dump
|
||||
|
||||
subgraph "Dumpers 1"
|
||||
dcsv[Derived CSV]
|
||||
djson[Derived JSON]
|
||||
end
|
||||
|
||||
subgraph "Dumpers 2 - after Dumper 1"
|
||||
dsqlite[SQLite]
|
||||
dnode[Node]
|
||||
viewgen[Preview View Gen<br/><em>Adds preview views</em>]
|
||||
previewgen[View Resource Generator]
|
||||
end
|
||||
|
||||
dumpdp[Derived DataPackage.json<br/><br/>Assembler gives it the DAG info<br/>Runs after everything<br/>as needs size,md5 etc]
|
||||
```
|
||||
|
||||
### With Excel (multiple sheets)
|
||||
|
||||
Source is vix-daily.xls (with 2 sheets)
|
||||
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
load[Load Data Package<br/>Parse datapackage.json]
|
||||
parse[Parse source data]
|
||||
dump((S3))
|
||||
|
||||
dumpdp[Derived DataPackage.json<br/><br/>Assembler gives it the DAG info<br/>Runs after everything<br/>as needs size,md5 etc]
|
||||
|
||||
load --> parse
|
||||
|
||||
parse --> d1csv
|
||||
parse --> d1json
|
||||
parse --> d2csv
|
||||
parse --> d2json
|
||||
|
||||
parse --> dumpdp
|
||||
|
||||
d1csv --> dump
|
||||
d1json --> dump
|
||||
d2csv --> dump
|
||||
d2json --> dump
|
||||
|
||||
dsqlite --> dump
|
||||
dnode --> dump
|
||||
dumpdp --> dump
|
||||
|
||||
d1csv --> dnode
|
||||
d1csv --> dsqlite
|
||||
d2csv --> dnode
|
||||
d2csv --> dsqlite
|
||||
|
||||
d1csv --> view1gen
|
||||
d2csv --> view2gen
|
||||
view1gen --> preview1gen
|
||||
view2gen --> preview2gen
|
||||
preview1gen --view1-10k.json--> dump
|
||||
preview2gen --view2-10k.json--> dump
|
||||
|
||||
subgraph "Dumpers 1 sheet 1"
|
||||
d1csv[Derived CSV]
|
||||
d1json[Derived JSON]
|
||||
end
|
||||
|
||||
subgraph "Dumpers 1 sheet 2"
|
||||
d2csv[Derived CSV]
|
||||
d2json[Derived JSON]
|
||||
end
|
||||
|
||||
subgraph "Dumpers 2 - after Dumper 1"
|
||||
dsqlite[SQLite]
|
||||
dnode[Node]
|
||||
view1gen[Preview View Gen<br/><em>Adds preview views</em>]
|
||||
preview1gen[View Resource Generator]
|
||||
view2gen[Preview View Gen<br/><em>Adds preview views</em>]
|
||||
preview2gen[View Resource Generator]
|
||||
end
|
||||
```
|
||||
|
||||
datapackage.json
|
||||
|
||||
```javascript=
|
||||
{
|
||||
resources: [
|
||||
{
|
||||
"name": "mydata"
|
||||
"path": "mydata.xls"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
```yaml=
|
||||
meta:
|
||||
owner: <owner username>
|
||||
ownerid: <owner unique id>
|
||||
dataset: <dataset name>
|
||||
version: 1
|
||||
findability: <published/unlisted/private>
|
||||
inputs:
|
||||
- # only one input is supported atm
|
||||
kind: datapackage
|
||||
url: <datapackage-url>
|
||||
parameters:
|
||||
resource-mapping:
|
||||
<resource-name-or-path>: <resource-url>
|
||||
|
||||
processing:
|
||||
-
|
||||
input: <resource-name> # mydata
|
||||
output: <resource-name> # mydata_sheet1
|
||||
tabulator:
|
||||
sheet: 1
|
||||
-
|
||||
input: <resource-name> # mydata
|
||||
output: <resource-name> # mydata_sheet2
|
||||
tabulator:
|
||||
sheet: 2
|
||||
```
|
||||
|
||||
```yaml=
|
||||
meta:
|
||||
owner: <owner username>
|
||||
ownerid: <owner unique id>
|
||||
dataset: <dataset name>
|
||||
version: 1
|
||||
findability: <published/unlisted/private>
|
||||
inputs:
|
||||
- # only one input is supported atm
|
||||
kind: datapackage
|
||||
url: <datapackage-url>
|
||||
parameters:
|
||||
resource-mapping:
|
||||
<resource-name-or-path>: <resource-url> // excel file
|
||||
|
||||
=> (implictly and in cli becomes ...)
|
||||
|
||||
...
|
||||
|
||||
processing:
|
||||
-
|
||||
input: <resource-name> # mydata
|
||||
output: <resource-name> # mydata-sheet1
|
||||
tabulator:
|
||||
sheet: 1
|
||||
```
|
||||
|
||||
Result
|
||||
|
||||
```javascript=
|
||||
{
|
||||
resources: [
|
||||
{
|
||||
"name": "mydata",
|
||||
"path": "mydata.xls"
|
||||
},
|
||||
{
|
||||
"path": "derived/mydata.xls.sheet1.csv"
|
||||
"datahub": {
|
||||
"derivedFrom": "mydata"
|
||||
}
|
||||
},
|
||||
{
|
||||
"path": "derived/mydata.xls.sheet2.csv"
|
||||
"datahub": {
|
||||
"derivedFrom": "mydata"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Overall component design
|
||||
|
||||
Assembler ...
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
source[Source from User<br/>source.yaml] --assembler--> plan[Execution Plan<br/>DAG of pipelines]
|
||||
plan --> pipeline[Pipeline Runner<br/>Optimizer/Dependency management]
|
||||
```
|
||||
|
||||
=> Assembler generates a DAG.
|
||||
|
||||
- dest filenames in advance ...
|
||||
- for each pipeline: pipelines it dpeends on
|
||||
- e.g. sqlite: depends on on all derived csv pipelines running
|
||||
- node depends: all csv, all json pipelines running
|
||||
- zip: depends on all csv running
|
||||
- Pipelines
|
||||
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
|
||||
source[Source Spec Parse<br/>validate?] --> planner[Planner]
|
||||
planner --> workplanb(Plan of Work<br/><br/>DAG of pipelines)
|
||||
|
||||
subgraph Planner
|
||||
planner
|
||||
subplan1[Sub Planner 1 e.g. SQLite]
|
||||
end
|
||||
```
|
||||
|
||||
### NTS
|
||||
|
||||
```
|
||||
function(sourceSpec) => (pipelines, DAG)
|
||||
|
||||
pipeline
|
||||
|
||||
pipeline-id
|
||||
steps = n *
|
||||
processor
|
||||
parameters
|
||||
schedule
|
||||
dependencies
|
||||
```
|
||||
109
site/content/docs/dms/flows/research.md
Normal file
109
site/content/docs/dms/flows/research.md
Normal file
@@ -0,0 +1,109 @@
|
||||
# Data Flows + Factory - Research
|
||||
|
||||
## Tooling
|
||||
|
||||
* Luigi & Airflow
|
||||
* These are task runners - managing a dependency graph between 1000s of tasks.
|
||||
* Neither of them focus on actual data processing and are not a data streaming solution. Tasks do not move data from one to the other.
|
||||
* AirFlow: see further analysis below
|
||||
* Nifi: Server based, Java, XML - not really suitable for quick prototyping
|
||||
* Cascading: Only Java support
|
||||
* Bubbles http://bubbles.databrewery.org/documentation.html - https://www.slideshare.net/Stiivi/data-brewery-2-data-objects
|
||||
* mETL https://github.com/ceumicrodata/mETL mito ETL (yaml config files)
|
||||
* Apache Beam: see below
|
||||
* https://delta.io/ - acid for data lakes (mid 2020). Comes out of DataBricks. Is this pattern or tooling?
|
||||
|
||||
|
||||
## Concepts
|
||||
|
||||
* Stream and Batch dichotomy is probably a false one -- and unhelpful. Batch is just some grouping of stream. Batch done regularly enough starts to be a stream.
|
||||
* More useful is complete vs incomplete data sources
|
||||
* Hard part of streaming (or batch) work is handling case where events arrive "late". For example, let's say i want to total up total transaction volume at a bank per day ... but some transactions arrived at the server late e.g. a transaction at 2355 actually arrives at 1207 because of network delay or some other issue then if i batch at 1200 based on what has arrived i have an issue. Most of work and complexity in Beam / DataFlow model relates to this.
|
||||
* Essential duality between flows and states via difference and wum. E.g. transaction and balance:
|
||||
* Balance over time -- differenced --> Flow
|
||||
* Flow -- summed --> Balance
|
||||
* Balance is often just a cached "sum".
|
||||
* Also relevant to datsets: we often think of them as states but really they are a flow.
|
||||
|
||||
### Inbox
|
||||
|
||||
* [x] DataFlow paper: "The Dataflow Model: A Practical Approach to BalancingCorrectness, Latency, and Cost in Massive-Scale,Unbounded, Out-of-Order Data Processing" (2015)
|
||||
* [ ] Stream vs Batch
|
||||
* [x] Streaming 101: The world beyond batch. A high-level tour of modern data-processing concepts. (Aug 2015) https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 **Good intro to streaming and DataFlow by one of its authors**
|
||||
* [ ] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 Follow up to previous paper
|
||||
* [ ] Apache Beam **in progress -- see below*
|
||||
* [ ] dbt **initial review. Mainly a way conventient way of tracking in DB transforms**
|
||||
* [ ] Frictionless DataFlows
|
||||
* [x] Kreps (kafka author): https://www.oreilly.com/radar/questioning-the-lambda-architecture/
|
||||
* lambda architecture is where you run both batch and streaming in parallel as way to have traditional processing plus some kind of real-time results.
|
||||
* basically Kreps says its a PITA to keep two parallel systems running and you can just go "streaming" (remember we are beyond the dichotomy)
|
||||
|
||||
## Apache Beam
|
||||
|
||||
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
|
||||
|
||||
### Pipeline
|
||||
|
||||
https://beam.apache.org/releases/pydoc/2.2.0/apache_beam.pipeline.html
|
||||
|
||||
Pipeline, the top-level Beam object.
|
||||
|
||||
A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG are transforms (PTransform objects) and the edges are values (mostly PCollection objects). The transforms take as inputs one or more PValues and output one or more PValue s.
|
||||
|
||||
The pipeline offers functionality to traverse the graph. The actual operation to be executed for each node visited is specified through a runner object.
|
||||
|
||||
Typical usage:
|
||||
|
||||
```python
|
||||
# Create a pipeline object using a local runner for execution.
|
||||
with beam.Pipeline('DirectRunner') as p:
|
||||
|
||||
# Add to the pipeline a "Create" transform. When executed this
|
||||
# transform will produce a PCollection object with the specified values.
|
||||
pcoll = p | 'Create' >> beam.Create([1, 2, 3])
|
||||
|
||||
# Another transform could be applied to pcoll, e.g., writing to a text file.
|
||||
# For other transforms, refer to transforms/ directory.
|
||||
pcoll | 'Write' >> beam.io.WriteToText('./output')
|
||||
|
||||
# run() will execute the DAG stored in the pipeline. The execution of the
|
||||
# nodes visited is done using the specified local runner.
|
||||
```
|
||||
|
||||
## Airflow
|
||||
|
||||
Airflow organices tasks in a DAG. A DAG (Directed Acyclic Graph) is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
|
||||
|
||||
* Each task could be Bash, Python or others.
|
||||
* You can connect the tasks in a DAG as you want (which one depends on which).
|
||||
* Tasks could be built from Jinja templates.
|
||||
* It has a nice and comfortable UI.
|
||||
|
||||
You can also use _Sensors_: you can wait for certain files or database changes for activate anoter jobs.
|
||||
|
||||
References
|
||||
|
||||
* https://github.com/apache/airflow
|
||||
* https://medium.com/videoamp/what-we-learned-migrating-off-cron-to-airflow-b391841a0da4
|
||||
* https://medium.com/@rbahaguejr/airflow-a-beautiful-cron-alternative-or-replacement-for-data-pipelines-b6fb6d0cddef
|
||||
|
||||
|
||||
### airtunnel
|
||||
|
||||
https://github.com/joerg-schneider/airtunnel
|
||||
|
||||
* https://medium.com/bcggamma/airtunnel-a-blueprint-for-workflow-orchestration-using-airflow-173054b458c3 - excellent piece on how to pattern airflow - "airtunnel", plus overview of key tooling
|
||||
|
||||
> This is why we postulate to have a central declaration file (as in YAML or JSON) per data asset, capturing all these properties required to run a generalized task (carried out by a custom operator). In other words, operators are designed in a generic way and receive the name of a data asset, from which they can grab its declaration file and learn how to parameterize and carry out the specific task.
|
||||
|
||||
```
|
||||
├── archive
|
||||
├── ingest
|
||||
│ ├── archive
|
||||
│ └── landing
|
||||
├── ready
|
||||
└── staging
|
||||
├── intermediate
|
||||
├── pickedup
|
||||
└── ready
|
||||
```
|
||||
Reference in New Issue
Block a user