tags : Data Engineering, Batch Processing Patterns, Kafka

FAQ

How does differential dataflow relate to stream processing?

A problem statement for restaurant orders :)

Let’s assume we are building a data pipeline for Uber Eats where we keep getting orders. We want a dashboard which shows each restaurant owner

  • How many orders?
  • What are the Top 3 ordered dishes (with order count)?
  • What’s the total sale amount?

There are 3 buttons on the UI which allow the users (restaurant owners) to see these numbers for different time periods:

  • Last 1 hour
  • Last 24 hours (= 1 day)
  • Last 168 hours (= 24h * 7 = 1 week)

The dashboard should get new data every 5 minutes. How do you collect, store and serve data?

See https://www.reddit.com/r/dataengineering/comments/18ciwxo/how_do_streaming_aggregation_pipelines_work/

Solution

There are techniques most data systems offer to handle periodic (5m, 1h, etc) computation on incoming data (from Kafka, etc).

- Dynamic table on Flink
- Dynamic table on Snowflake (as \fhoffa mentioned)
- Materialized views on Clickhouse
- Delta Live table on Databricks

The underlying concept is that these systems store metrics state (either in memory or on disk or another db) and update them as new data comes in. In our case they will update per hour and then dump it into a place to be read from later.

If you partition your data by hour, One thing to note is that the metrics you specified are additive (can be added across hours), if they are non additive (such as unique customer id count) that will require more memory (or other techniques such as Hyperloglog).

While most systems can handle these for you, its good to know the underlying concept, since that will impact memory usage/cost. Hope this helps. LMK if you have any questions.

What is it really?

Rethinking Stream Processing and Streaming Databases - RisingWave: Open-Source Streaming SQL Platform

stream processing and streaming ingestion are different things. There are systems which do one, there are also systems which do both, partially, fully, all tradeoffs.

Streaming ingestion

  • This simply means ingesting to something in a streaming fashion (could be a OLAP database)
  • This has to deal with things like de-duplications as OLAP systems don’t have a primary key etc. Schema and Datatype changes in source table etc.
  • Along with streaming ingestion lot of OLAP databases allow for continuous processing (different from stream processing)
    • Underlying concept is that these systems store metrics state and update them as new data comes in.
    • Some examples are: Dynamic table on Flink, Dynamic table on Snowflake, Materialized views on Clickhouse, Delta Live table on Databricks etc.

Stream processing

Stream processing systems are designed to give quick updates over a pre-defined query. To achieve that, incremental computation lies at the core of the design principle of stream processing. Instead of rescanning the whole data repository, stream processing systems maintain some intermediate results to compute updates based only on the new data. Such intermediate results are also referred to as states in the system. In most cases, the states are stored in memory to guarantee the low latency of stateful computation. Such a design makes stream processing systems vulnerable to failures. The whole stream processing pipeline will collapse if the state on any node is lost. Hence, stream processing systems use checkpoint mechanisms to periodically back up a consistent snapshot of global states on an external durable storage system. If the system fails, it can roll back to the last checkpoint and recover its state from the durable storage system.

  • In most cases, they’re more flexible and capable than OLAP db’s continious processing features. So they sit in-front of the streaming ingestion into OLAP making sure OLAP gets cleaned and processed data.
    • But in most cases, you might not even not need stream processing, whatever continious processing feature OLAP DBs have might be enough, they’re great imo in 2024.

Architectures

Tech Comparison

Comparison table

RT Stream ProcessorDBMSTech NameRT Stream IngestionWhat?
YesNoApache FlinkYes, but not meant to be a DBCan be used in-front of a OLAP store
YesNoBenthos/Redpanda ConnectNoApache Flink, Rising Wave alternative
YesNoProtonNoStream Processing for Clickhouse ?
YesYes (PostgreSQL based)RisingWaveYes, but not meant to be a DBApache Flink alternative, it’s pretty unique in what it does. See comparision w MateralizeDB
YesYes(OLAP)Materialize DBYesSort of combines stream processing + OLAP (More powerful than Clickhouse in that case), supports PostgreSQL logical replication!
?Yes(OLAP)BigQueryYesIt’s BigQuery :)
Not reallyYes(OLAP)Apache DruidTrue realtime ingestionAlternative to Clickhouse
NoYes(OLAP)Apache PinotYesAlternative to Clickhouse
NoYes(OLAP)StarTreeYesManaged Apache Pinot
NoYes(OLAP)ClickhouseYes, Batched realtime ingestionSee Clickhouse
NoQuack like it!(OLAP)DuckDBNoSee DuckDB
NoYes(OLAP)RocksetYesEasy to use alternative to Druid/Clickhouse. Handles updates :)
NoYes(OLAP)SnowflakeYes but not very good-
NoNoNATSMessage Broker
NoNoNATS JetstreamKafka alternative
NoNoKafkakafka is kafka

Druid vs Clickhouse

ClickHouse doesn’t guarantee newly ingested data is included in the next query result. Druid, meanwhile, does – efficiently, too, by storing the newly streamed data temporarily in the data nodes whilst simultaneously packing and shipping it off to deep storage.

Risingwave vs Clickhouse

  • They can be used together

Benthos(Redpanda Connect) vs Risingwave

  • Both are stream processors
  • Risingwave is a stateful streaming processor
  • Benthos is sitting somewhere between stateless and stateful systems

NATS jetstream vs Benthos

  • NATS jetstream seems like a persistent queue (Eg. Kafka)
  • Benthos is something that would consume from that stream and do the stream processing and put the output elsewhere.

More on Redpanda

More on Redpanda Connect / Benthos

Delivery Guarantee

See Data Delivery

  • In benthos/rp connect, input <> process <> output , entire pipeline needs to run e2e(i.e output needs to finish), then it sends an ACK back to the input in which case it’ll move on to the next item in the list.
  • dropping a message also sends an ACK, i.e we’ll move to the next message etc.

Lifecycle

  • With run, eg. after using input:file the process will automatically kill itself
  • But if using streams, they process will keep living
  • After “success” ACK is sent back to the input from the output, but synchronous response is also possible with sync_response output component.
    • sync_response only work with few input components such as http_server
    • sync_response can be used with input:http_server and output:http_client turning redpanda connect into sort of a proxy webserver.

Message semantics

  • messages have metadata + context

Configuration, Components and Resource (Hierarchy)

  • Configuration is passed via config file/api, resources are passed in via the -r flag.
  • -r is usually used to switch resources in different environments (used as feature toggle)
  • components

    • rpk connect list, we sort of have components for these:
      • Inputs
      • Processors
      • Outputs
      • Caches
      • Rate Limits
      • Buffers
      • Metrics
      • Tracers
      • Scanners
    • In some/most cases, the same component will be available for input and output but the parameters will differ etc.
    • configuration contains components
      • components are hierarchical.
        • Eg. input can have a list of child processor attached to it, which in turn can have their own processor children.
  • resources

    • resources are components , identified by label.
      • Some components such as caches and rate limits can only be created as a resource.
    • Main usecase is “reuse”, another similar concept is templates
    • Can be specified in the same configuration, can also be specified separately and passed via -r
    • Types
      • input_resources: []
      • cache_resources: []
      • processor_resources: []
      • rate_limit_resources: []
      • output_resources: []
      • It’s defined using the above config and used in input/pipeline/output via the resource component
  • Other

    • Eg. crazy number of levels
     processors:
        - resource: foo # Processor that might fail
        - switch:
          - check: errored()
            processors:
              - resource: bar # Recover here

Batching

  • Batching at input vs output

    • It’s little confusing, some components apparently perform better when batched at input, and some at input. If you want to do “batch processing” then you’d want to batch in the input.
      • If input itself doesn’t support batching, we can use broker and make the input or multiple input support batching at input stage.
      • broker also works similarly for output
  • Creating and breaking batches

    • A batch policy(input/output having batching attribute) has the capability to create batches, but not to break them down.
      • This also takes in processor incase, you want to pre-process before sending out the batch.
    • split breaks batches

Processor (array of processors in input/pipeline/output section)

  • proccessor are not just limited to pipeline section.
  • Native way of doing processing/manipulating shit is via bloblang (via mapping or mutation)
    • NOTE: The mapping processor previously was called the bloblang processor(now deprecated)
  • Data Enrichment

    • Branching

    • Workflow

    • Caching

      There’s cache and then there’s cached

      • cache

      • cached

  • Batching & buffer

    • Basics

      • buffer is how we handle batching
      • buffer can be: window buffer, batch buffer
      • Using buffer might alter the delivery guarantees etc. might drop things. be cautious.
    • Transactions and Buffering

      See Database Transactions

      • input -> processing -> output is a transaction (an ACK is sent to the input after output finishes)
      • But when we use buffering, the semantics of this “transaction” is changed.
    • Pure Batching

      • config needed: count, size, period
    • Windowing / Window Processing

      • Tumbling window: No overlap
      • Sliding window: Overlap
      • config needed: system_window & group_by_value in the processor
      • This seems like but is NOT stateful stream processing, benthos is stateless. (debatable)

Exception handling

  • Works w processor and output, not w input??? (DOUBT)
    • retry and drop work for both processor and output
    • reject and dlq work only for output
  • Exception/Error creation

    • an exception can be triggered by whatever happens when processing
    • we can however have some control over how to create custom errors using blobl, otherwise if the processor itself errors out that works too.
  • Catching / Try / Drop

    • try (abandon) : If any of the processor in sequence fails, “skip” rest of it.
    • retry (unlimited attempts)
      • This is similar to try as in like a blanket processor. But try simply just takes in a list of processor, but retry takes in some more params along with list of processors
    • catch (handle): catch the message/event w error and handle it somehow.
      • catch can be emulated with switch:check:errored() (this is desirable if you want to maintain the error flag after success)
      • This is the catch processor, which is different from the catch function used in bloblang.
      • catch can then send off the error() to log or remap the new document with the error message etc.
  • Dropping message

    • mapping + errored() + deleted() : Dropping at processing stage. I personally would suggest dropping at output stage.
    • reject_errored: (Dropping at output stage)
      • rejects messages that have failed their “processing” steps
        • If input supports NACK/NAK, it’ll be sent a NAK at protocol level, and that message will NOT be re-processed.
        • Successful messages/events are sent to the child output
        • Otherwise they’ll simply be reprocessed unless we use fallback and setup a DLQ mechanism.
    • switch + errored() + error(): Maximum flexibility while handling how to route failed/errored messages.
    • broker can also be used to route messages accordingly.
  • Fallback(eg.DLQ) and Batching

    • Redpanda Connect makes a best attempt at inferring which specific messages of the batch failed, and only propagates those individual messages to the next fallback tier.
    • However, depending on the output and the error returned it is sometimes not possible to determine the individual messages that failed, in which case the whole batch is passed to the next tier in order to preserve at-least-once delivery guarantees.

Development Tips 🌟

  • Test script

    input:
      stdin:
        scanner:
           lines: {}
    output:
      stdout: {}

    **

Bloblang

  • 3 ways to use bloblang in pipeline:
  • Most common usecase is via mapping. Some of the usecases of mapping overlaps with use of jq processor and so on.
  • Works in processors (proper) and in output (via interpolation only)
    • Processor
      • processor:bloblang
      • processorr:mapping
      • processor:branch:request_map/result_map
      • processor:<other processors> , eg. switch processor has a check attribute which takes in a blobl query.
    • Output (via interpolation)
  • additionally, can evaluate using: rpk connect blobl / server

  • Interpolation & Coalecing

    • Env var: {ENV_VAR_NAME}
    • Interpolation of blobl: ${!<bloblang expression>}
    • Coalecing: root.contents = this.thing.(article | comment | share).contents | “nothing”
  • Variables

    • Using let & referenced using $
    • Metadta access via interpolation or via @ inside mapping
    • Log all metadata
      ​    - log:
              level: INFO
              fields_mapping: |-
                root.metadata = meta()
  • Conditionals

    • all conditionals are expressions
    • if : Can be a statement and can also be an assignemnt
      // this works
      root.pet.treats = if this.pet.is_cute {
        this.pet.treats + 10
      } else {
        deleted()
      }
      
      // this also works
      if this.pet.is_cute {
        root.pet.treats = this.pet.treats + 10
        root.pet.toys = this.pet.toys + 10
      }
    • match: Only assignment
      • Also accepts a parameter which alters the this context.
  • Error handling & creation

    • It seems like error handling is only supported in pipeline block and not in input block processors. (have to confirm)
    • Use of catch
    • Error handling of redpanda connect/benthos is different from blobl error handling. These consepts exists at different levels of the stack.
    • We create error either via throw or via using helper functions which help in validating and coercing fields(eg. number, not_null, not_empty etc.)
  • TODO Functions vs Methods

    • Methods: Specific to datatypes/structured inputs etc. Eg. string.split etc
    • Functions: Independent functions
  • Custom Methods

    • We create custom methods by using map keyword.
      • this inside map refers to the parameter of apply
      • root inside map refers to new value being created for each invocation of the map
    • Can be applied to single values or to the entire object recursively.
      // use map for a specific case
      map parse_talking_head {
        let split_string = this.split(":")
      
        root.id = $split_string.index(0)
        root.opinions = $split_string.index(1).split(",")
      }
      
      root = this.talking_heads.map_each(raw -> raw.apply("parse_talking_head"))
      
      // apply for the whole document recursively
      map remove_naughty_man {
        root = match {
          this.type() == "object" => this.map_each(item -> item.value.apply("remove_naughty_man")),
          this.type() == "array" => this.map_each(ele -> ele.apply("remove_naughty_man")),
          this.type() == "string" => if this.lowercase().contains("voldemort") { deleted() },
          this.type() == "bytes" => if this.lowercase().contains("voldemort") { deleted() },
          _ => this,
        }
      }
      
      root = this.apply("remove_naughty_man")
  • this & named context

    • this points to the “context”
    • Usually it points to the input document, but based on usage it can point to other things, eg. inside a match block, match this.pet now inside the match block, this will point to the document inside the outer this.pet, i.e reducing boilerplate inside the match block.
    • Instead of this, we can also use named context with <context name> -> <blobl query>
  • this and root & mapping

    • mapping “most of the time” refers to mapping JSON(structured) documents (Both new and input documents)
    • mapping purpose: construct a brand new document by using an input document as a reference
      • Uses assignments, everything is an assignment, even deletion!
    • components
      • root: referring to the root of the new document
      • this: referring to the root of the input document
    • In a mapping, on the LHS, if you omit root, it’ll still be root of the new document. i.e root.foo is same as foo on lhs.
    • Non JSON documents

      • this (“this” is JSON specific)
        • this keyword mandates that the current in-flight message is in JSON format
        • You should first ensure that your message is valid JSON by using log processor (if it’s not, you can use the content() function in a mapping processor to extract the message data and parse it somehow)
      • content()
        • Can be used in mapping the input as a binary string. Eg. root = content().uppercase() vs root = this, result is similar but this is json specific but with content we’re reading with raw data.

Other notes

  • @ vs meta
    • root.poop = meta(“presigned_url”) # string
    • root.doop = @presigned_url # binary

things ppl say

  • We use debezium to stream CDC from our DB replica to Kafka.
    • We have various services reading the CDC data from Kafka for processing

Resources