tags : Distributed Systems, Database, Caches, Database Transactions

FAQ

TODO Data Layers?

NOTE: This is just for my understanding

  • Storage Layer: S3, Parquet, Other file types
  • Transaction Layer: Open table formats, eg. Delta Lake
  • Semantic Layer : Cube (This is new kid, an enhancement)

File listing problem in Data lakes

  • When you want to read a Parquet lake, you must perform a file listing operation and then read all the data. You can’t read the data till you’ve listed all the files.
  • With delta lake, because of the transaction log, we can avoid file listing and get the path directly.

Access patterns and Performance

File skipping

  • Column Pruning

    • Unlike CSV, with parquet we can do column pruning.
  • Parquet row-group Filtering

    • Parquet file is split by row-group by format.
    • Each row-group has min/max statistics for each column.
    • We can filter by row-group for some column before we load the data, after which we apply our actual application filtering logic.
  • File filtering

    • Parquet doesn’t support file-level skipping
    • This can be done with Delta Tables, we still add the row-group filter but additionally do file based filtering.
    • Because of the transaction log, we get even better results than simple row-group filtering in plain parquet file with file level skipping.
  • Partition pruning/filtering

    • This assumes you’ve partitioned your data correctly
    • The more partitions you can “prune” or hone in on the less files and data must be scanned for matches.

Compaction and Defrag

  • Compaction
  • Defrag
    • z-ordering, will co-locate similar data in similar files.
    • Z-order clustering can only occur within a partition. Because of this, you cannot Z-order on fields used for partitioning.
    • Z-order works for all fields, including high cardinality fields and fields that may grow infinitely (for example, timestamps or the customer ID in a transactions or orders table).

Partitioning

  • Partitioning is the way to break big data up into chunks, logically and physically. Commonly things are partitioned over dates.
  • Partitioning works well only for low or known cardinality fields (for example, date fields or physical locations) but not for fields with high cardinality such as timestamps. (See z-ordering)
  • To check cardinality, you can try listing the number of distinct values per column.

What about Databricks?

  • Databricks has some optimizations such as ingestion time clustering which won’t be there if you’re using open source version of delta lake. We can get creative and find ways to optimize this based on our data, maybe with partitions or by other needs, whatever necessary.

General recommendations

  • Data processing engines don’t perform well when reading datasets with many small files. You typically want files that are between 64 MB and 1 GB. You don’t want tiny 1 KB files that require excessive I/O overhead.

What’s the evolution like?

Data cleaning tools

  • This is a slight enhancement
  • You can always do data cleaning using the standard library methods of the language you’re using. (Eg. using optional chaining / coalesing etc)
  • But there are certain libraries/processes which can maybe make the data cleaning processes simpler/easier to write and maintain/extend etc.
  • These tools can be especially useful when the response object is not guaranteed to be the exact same and you need some verification around that X,Y,Z field exists etc etc.
  • In R, you have tools such as: A Grammar of Data Manipulation • dplyr
  • In Javascript, you have tidyjs but it suffers issues, even loadsh can help a bit i guess but at that point just use optional chaining maybe
    • Now for this I thought we can use effect/schema or zod for that matter and split it into 2 steps as mentioned here:
      • The first model should capture the “raw” data more or less in the schema you expect from the API.
      • The second model should then reflect your own desired data schema.
      • This 2 step process helps debug the pipeline if ever needed. Applies to other validator in other languages like pydantic(python) aswell.
  • But for this job, I’d personally prefer python+pydantic. Goes good together.
  • Another thing handle data cleaning at the application level if possible and not at the db level. You should always enforce proper constraints if its structured data but most of the data might be unstrutured and we might want to store it in JSONB columns etc.

Data pipeline

Point of data pipeline is to generate/maintain data assets (Tables/Files/ML models etc)

Data Warehouse and Data Lake

See Data systems built on Object Store

Data warehouse (Analytics, Structured Data)

  • Exists because we have different sources
    • OLTP source of truth database, but you also have SaaS products, events, logging exhaust, and honestly probably some other OLTP databases
  • Warehouse provides value because it brings together data that otherwise could not be jointly queried.
  • Traditionally used for analytical, but usecases for operational queries are also there
  • Must take care of workload contention so what analytical workload on the warehouse doesn’t affect operational data which needs freshness.
  • Must separate the storage from the compute to scale.
  • Tools like DBT help interact with/build DW

Analytical

  • Designed to grind through reams of historical data and efficiently produce periodic reports.
  • Eg. snowflake, can be duckdb, can even be a bash script for that matter

Operational

Data Lake (Storage, raw/unstructed/structured data)

  • Throw data into files like json, csv, parquet into a storage system like s3
  • This is also supposed to handle un-structured data like text, images, videos etc.
  • Eg. Parquet data lake: likely has a partitioning strategy that has been optimized for your existing workloads and systems.

Data Lakehouse

  • Combining
    • Analytics strengths of the Data warehouse
    • Scalability and inexpensive storage of the Data lake
  • Data warehousing principles applied over data in a data lake.
  • Open Table formats / Lakehouse storage systems

Technologies

Apache Spark

  • An alternative to MapReduce but better

Data formats

  • Arrow and Parquet are used together to leverage their respective strengths: Arrow’s efficiency in in-memory processing and Parquet’s effectiveness in on-disk data storage and retrieval.

Parquet (on disk)

  • Parquet was created to provide a compressed, efficient columnar data storage format
  • Data format that allows querying + metadata along w actual data
  • Metadata: Schema info, File level metadata, Column level, custom user level, stats for data skipping etc.
  • Grafana Tempo switched from Protocol Buffers storage format to Apache Parquet last year.
  • Supports schema and compression, because columnar easier to do compression (processing speed + storage reduction)
  • See Delta Lake vs. Parquet Comparison | Delta Lake
  • Format

    • Hierarchically: File > row group(s) > Column(s) > 1 column chunk > page(s)
    • Uses the record shredding and assembly algorithm described in the Dremel paper to convert nested schema to columns.
    • Data is stored in batches(“row groups”)
    • Row Group = Batch = column(s)
    • Each column can contain compressed data and metadata
    • By itself does not support append, i.e parquet files are immutable. See Delta Lake for append.
  • Hive Style Partitions

    # writing
    applications.to_parquet('somedir', schema = my_schema, partition_cols=['INCOME_TYPE'])
     
    # reading
    pd.read_parquet('somedir/NAME_INCOME_TYPE=Working/')
    # OR ( this does a seq scan for the partition unlike the direct path^)
    pd.read_parquet('somedir', filters=[('INCOME_TYPE', '=', 'Working')])
    # OR
    pd.read_parquet('somedir') # read the whole thing

Apache Arrow (in memory)

  • Arrow is an in-memory data format designed for efficient data processing and interchange.
  • Arrow provides the efficient data structures and some compute kernels, like a SUM, a FILTER, a MAX etc.
  • Arrow is not a Query Engine
  • Its primary focus is to enable fast analytics on big data by providing a standardized, language-agnostic memory format that supports zero-copy reads.
  • Makes it possible to exchange a pointer to some memory and other process would be able to read this memory without copy and use further.
  • Example of DuckDB and Delta-rs
    • Duckdb and deltars(data fusion) “speaks” arrow
    • So you can read something in delta-rs and hand it over to duckdb engine in a format that both understand.

Open Table formats

Open table formats such as delta or iceberg were developed to serve the big data engines, such as spark or flink. But this is slowly changing as Delta Lake is slowly becoming agnostic and now can be accessed from polars and DuckDB aswell.

Delta Table/Lake

  • Delta “Table/Lake” are interchangeable
  • Delta Lake is used to implement the Databricks lakehouse.
  • It is the “transactional layer” applied on top of the data lake “storage layer” via the transaction log.
    • Allows for ACID capabilities (see Database)
  • Delta Lake features break assumptions about data layout that might have been transferred from Parquet, Hive, or even earlier Delta Lake protocol versions.
  • What?

    • It is a folder with
      • Many parquet files, which are normally just added
      • A transaction log folder with metadata. (Storing metadata in transaction log more efficient than storing it in the parquet file itself?)
    • These parquet files build up our last version of the tables.
    • You can imagine that readers and writers add and delete parquet files and are synchronized over the transaction log folder.
  • Why?

    • If we just have simple parquet files in our data lake, we’d have to write additional logic for upsert/merge etc. But with using something like Delta Lake, this becomes simpler.

Dataframe libraries

Polars

More on Polars

Pandas

  • Uses numpy as its data representation

ETL

Meta Ideas about ETL

ETL or ELT?

  • Combination of cheap storage + easy to run compute = switch from ETL to ELT
  • In ELT, everyone just dumps the raw data and then does all the cleaning/transforming there with SQL.

Singer Spec (EL protocol)

  • Singer is an open-source project/protocol that provides a set of best practices for writing scripts that move data.
  • Singer uses a JSON-based specification for defining how data is extracted and load(EL), and it has a growing ecosystem of pre-built taps and targets
    • taps: For extracting data from various sources
    • targets: For loading data into different destinations
  • Issue with Singer is tap/target quality. The spec itself is fine, but the tap/targets written by the community need to be more standardized.
  • Singer is also a CLI tool??

Reverse ETL? (confirm?)

  • This is basically creating operational/transactional database records from analytical db

Data Pipeline Frameworks (for EL, Extract and Load)

NameSinger?OSS/Selfhost?Cloud/Paid?
StitchYESNOYES(less)
MeltanoYESYESNot Yet
PipelineWiseYESYESNO
FivetranNONOYES(very)
AirbyteNOYESYES
HevodataNONOYES
  • Cloud offerings usually have orchestration built in, so need not worry about that.
  • Lot of Meltano extractors/loaders are airbyte wrappers

Singer spec based

  • Stitch

    • Stitch obviously gets a lot of use out of Singer,
    • Stitch, of course, inherits all of Singer’s problems, since it uses Singer.
  • Meltano

    • Implementation of the Singer spec
    • Singer SDK: Allows you to create a tap and targets for some sources, without groking the Singer spec. This tries to overcome issues about quality of taps/targets.
    • Meltano also inherits all Singer issues.
  • PipelineWise

    • Not very popular but is in the same bucket

Airbyte

  • Airbyte data protocol (still compatible with Singer’s one so you can easily migrate any tap to Airbyte)
  • Trying to build a better standard and connector ecosystem than Singer.
  • Doesn’t have an official CLI, everything happens over API calls. For automation, suggests a terraform provider.

Fivetran

  • Fivetran is an ELT tool and a direct competitor to Stitch

DltHub

  • dlt is new kid in town
  • When using with dagster, a `dlt` pipeline would be a dagster asset
  • You can use singer taps with dlt, but it makes for a bad experience.
  • dlt is upstream of dbt, dlt loads data, dbt transforms it.
  • But they’re sort of leaning of gpt for some reason, I don’t understand why

Data Transformation Frameworks

Examples

  • DBT (T, Transform)

    • Working isolated on a database server was complicated. A solution was introduced with dbt.
    • Writing, versioning, testing, deploying, monitoring all that complicated SQL is very challenging. This is what DBT is good for. DBT doesn’t replace SQL, it simply augments it by allowing you templatize, modularize, figure out all the inter-dependencies, etc.
    • Why DBT? You would generally use dbt as a transform framework for the same reason you’d use Ruby on Rails or Django etc as a web framework
    • The unreasonable effectiveness of dbt
    • Analogy (with MVC, See Design Patterns)
      • Model : source
      • Controller : ephemeral model
      • View : materialized (view, table, incremental) model
    • DBT Gotchas
      • The filename in the models/ directory is the table name that the select statement will write to
      • Currently only works with .yml files, not .yaml
      • DBT is not a EL/Ingestion tool, the source and target need to be in the same database. i.e, You can’t transform data from PostgreSQL to DuckDB, you’d have to do the loading/ingestion step separately and then in whichever DB preferable, you do the transformation. After the transformation is done, we can decide where to store the transformed data, i.e the target

Anomaly detection

Processing/Ingestion Types

While batch/stream processing and batch/stream ingestion are two completely different things, this section talks all of it in general.

Batch processing

Stream processing

See Ingestion Patterns

  • Streaming use cases are defined by the consumption SLA of the data, not the production. 🌟
  • This can mean different things for different people, eg. for some ingestion pipeline, streaming might just means anything ingesting more often than every ten minutes.

Update patterns

See https://docs.delta.io/2.0.2/delta-update.html#-write-change-data-into-a-delta-table

CDC

SCD Type2

Problems / Challenges

  • De-duplication
  • Schema drift, column type change in source

Information retrieval

See Information Retrieval

Hotspots

  • Horizontally scaled but shit going to the same node. UUID instead of incremental primary key would be a saver.

Cold start

  • If cache crashes, all high traffic goes to db and db can crash again, so we need some way to warm up the cache beforehand

TODO Layered architecture

There is no one fits all architecture that makes sense for all situations. In fact it is mostly the opposite. The role of an architect is to know how to pick and choose patterns/technologies/methodologies to best solve a problem. The worst architects know one way of doing things and force it into every situation.

The medallion architecture is one way to architect a data flow. Sometimes 3 stages is good. Sometimes you just need one. Maybe some complicated ones need 5. Also the stages don’t necessarily need to be physically separate. A silver layer consisting only of views is perfectly valid.

And for the love of money don’t call the layers bronze, silver and gold. I’ve moaned before about it, it is meaningless to users/engineers… like calling 1, 2 and 3. Give the layers meaning to what you intend to use them for. Raw, validated, conformed, enriched, curated, aggregated … people will know exactly what that layer means just by the name.

  • Reddit users

Medallion architecture

  • bronze/raw/staging/landing
    • for data you have little to no control over, like data imported from external sources
    • We may or may not retain these
  • silver/intermediate/transform
    • your main work area, the bulk of tables go to this layer, little to no guarantee about forward/backward compatibility with external systems
    • Minimal conversion to Delta Table. Minimal or no typing. Append only, may add columns for source file metadata or date or batch job id to rows. Will grnerally be 1/5th size of raw
  • gold/final
    • data that you expose to external system: optimized for queries, strict schema compatibility, etc

How many layers to have?

  • Sometimes you might just need bronze -> gold, sometimes the problem might demand something more than these 3 layers etc.

What format should data be in each layer?

  • The format could be anything, typical convention is raw will have parquet/json etc. Silver will have something like delta table etc, and gold will be some fast query db etc.
  • But all of these 3 could be the same too. Eg. all of the 3 could be different postgres tables, all 3 could be delta lake tables.