tags : Data Engineering, Batch Processing Patterns, Kafka
FAQ
How does differential dataflow relate to stream processing?
A problem statement for restaurant orders :)
Let’s assume we are building a data pipeline for Uber Eats where we keep getting orders. We want a dashboard which shows each restaurant owner
- How many orders?
- What are the Top 3 ordered dishes (with order count)?
- What’s the total sale amount?
There are 3 buttons on the UI which allow the users (restaurant owners) to see these numbers for different time periods:
- Last 1 hour
- Last 24 hours (= 1 day)
- Last 168 hours (= 24h * 7 = 1 week)
The dashboard should get new data every 5 minutes. How do you collect, store and serve data?
Solution
There are techniques most data systems offer to handle periodic (5m, 1h, etc) computation on incoming data (from Kafka, etc).
- Dynamic table on Flink
- Dynamic table on Snowflake (as \fhoffa mentioned)
- Materialized views on Clickhouse
- Delta Live table on Databricks
The underlying concept is that these systems store metrics state (either in memory or on disk or another db) and update them as new data comes in. In our case they will update per hour and then dump it into a place to be read from later.
If you partition your data by hour, One thing to note is that the metrics you specified are additive (can be added across hours), if they are non additive (such as unique customer id count) that will require more memory (or other techniques such as Hyperloglog).
While most systems can handle these for you, its good to know the underlying concept, since that will impact memory usage/cost. Hope this helps. LMK if you have any questions.
What is it really?
stream processing
andstreaming ingestion
are different things. There are systems which do one, there are also systems which do both, partially, fully, all tradeoffs.
Streaming ingestion
- This simply means ingesting to something in a streaming fashion (could be a OLAP database)
- This has to deal with things like de-duplications as OLAP systems don’t have a primary key etc. Schema and Datatype changes in source table etc.
- Along with streaming ingestion lot of OLAP databases allow for
continuous processing
(different from stream processing)- Underlying concept is that these systems store metrics state and update them as new data comes in.
- Some examples are: Dynamic table on Flink, Dynamic table on Snowflake, Materialized views on Clickhouse, Delta Live table on Databricks etc.
Stream processing
Stream processing systems are designed to give quick updates over a pre-defined query. To achieve that, incremental computation lies at the core of the design principle of stream processing. Instead of rescanning the whole data repository, stream processing systems maintain some intermediate results to compute updates based only on the new data. Such intermediate results are also referred to as states in the system. In most cases, the states are stored in memory to guarantee the low latency of stateful computation. Such a design makes stream processing systems vulnerable to failures. The whole stream processing pipeline will collapse if the state on any node is lost. Hence, stream processing systems use checkpoint mechanisms to periodically back up a consistent snapshot of global states on an external durable storage system. If the system fails, it can roll back to the last checkpoint and recover its state from the durable storage system.
- In most cases, they’re more flexible and capable than OLAP db’s
continious processing
features. So they sit in-front of the streaming ingestion into OLAP making sure OLAP gets cleaned and processed data.- But in most cases, you might not even not need stream processing, whatever
continious processing
feature OLAP DBs have might be enough, they’re great imo in 2024.
- But in most cases, you might not even not need stream processing, whatever
Architectures
- Lambda: Here we merge real-time and historical data together
- Kappa: If you don’t need a lot of historical data, and only need streaming data.
Tech Comparison
Comparison table
RT Stream Processor | DBMS | Tech Name | RT Stream Ingestion | What? |
---|---|---|---|---|
Yes | No | Apache Flink | Yes, but not meant to be a DB | Can be used in-front of a OLAP store |
Yes | No | Benthos/Redpanda Connect | No | Apache Flink, Rising Wave alternative |
Yes | No | Proton | No | Stream Processing for Clickhouse ? |
Yes | Yes (PostgreSQL based) | RisingWave | Yes, but not meant to be a DB | Apache Flink alternative, it’s pretty unique in what it does. See comparision w MateralizeDB |
Yes | Yes(OLAP) | Materialize DB | Yes | Sort of combines stream processing + OLAP (More powerful than Clickhouse in that case), supports PostgreSQL logical replication! |
? | Yes(OLAP) | BigQuery | Yes | It’s BigQuery :) |
Not really | Yes(OLAP) | Apache Druid | True realtime ingestion | Alternative to Clickhouse |
No | Yes(OLAP) | Apache Pinot | Yes | Alternative to Clickhouse |
No | Yes(OLAP) | StarTree | Yes | Managed Apache Pinot |
No | Yes(OLAP) | Clickhouse | Yes, Batched realtime ingestion | See Clickhouse |
No | Quack like it!(OLAP) | DuckDB | No | See DuckDB |
No | Yes(OLAP) | Rockset | Yes | Easy to use alternative to Druid/Clickhouse. Handles updates :) |
No | Yes(OLAP) | Snowflake | Yes but not very good | - |
No | No | NATS | Message Broker | |
No | No | NATS Jetstream | Kafka alternative | |
No | No | Kafka | kafka is kafka |
Druid vs Clickhouse
ClickHouse doesn’t guarantee newly ingested data is included in the next query result. Druid, meanwhile, does – efficiently, too, by storing the newly streamed data temporarily in the data nodes whilst simultaneously packing and shipping it off to deep storage.
- Druid is more “true” realtime than clickhouse in this sense but that much of realtime is not usually needed in most of my cases.
- What Is Apache Druid And Why Do Companies Like Netflix And Reddit Use It? - YouTube
Risingwave vs Clickhouse
- They can be used together
Benthos(Redpanda Connect) vs Risingwave
- Both are stream processors
- Risingwave is a stateful streaming processor
- Benthos is sitting somewhere between stateless and stateful systems
NATS jetstream vs Benthos
- NATS jetstream seems like a persistent queue (Eg. Kafka)
- Benthos is something that would consume from that stream and do the stream processing and put the output elsewhere.
More on Redpanda
- See Kafka
More on Redpanda Connect / Benthos
- https://github.com/weimeilin79/masterclass-connect
- https://graphql-faas.github.io/benthos/concepts.html
Delivery Guarantee
See Data Delivery
- In benthos/rp connect,
input <> process <> output
, entire pipeline needs to run e2e(i.eoutput
needs to finish), then it sends anACK
back to the input in which case it’ll move on to the next item in the list. - dropping a message also sends an
ACK
, i.e we’ll move to the next message etc.
Lifecycle
- With
run
, eg. after usinginput:file
the process will automatically kill itself - But if using
streams
, they process will keep living - After “success” ACK is sent back to the input from the output, but synchronous response is also possible with
sync_response
output component.sync_response
only work with few input components such ashttp_server
sync_response
can be used withinput:http_server
andoutput:http_client
turning redpanda connect into sort of a proxy webserver.
Message semantics
- messages have
metadata
+context
Configuration
, Components
and Resource
(Hierarchy)
- Configuration is passed via config file/api,
resources
are passed in via the-r
flag. -r
is usually used to switch resources in different environments (used as feature toggle)
-
components
rpk connect list
, we sort of have components for these:- Inputs
- Processors
- Outputs
- Caches
- Rate Limits
- Buffers
- Metrics
- Tracers
- Scanners
- In some/most cases, the same
component
will be available forinput
andoutput
but the parameters will differ etc. configuration
containscomponents
components
are hierarchical.- Eg.
input
can have a list of childprocessor
attached to it, which in turn can have their ownprocessor
children.
- Eg.
-
resources
resources
arecomponents
, identified bylabel
.- Some
components
such ascaches
andrate limits
can only be created as aresource
.
- Some
- Main usecase is “reuse”, another similar concept is
templates
- Can be specified in the same configuration, can also be specified separately and passed via
-r
- Types
input_resources: []
cache_resources: []
processor_resources: []
rate_limit_resources: []
output_resources: []
- It’s defined using the above config and used in
input/pipeline/output
via theresource
component
-
Other
- Eg. crazy number of levels
processors: - resource: foo # Processor that might fail - switch: - check: errored() processors: - resource: bar # Recover here
Batching
-
Batching at
input
vsoutput
- It’s little confusing, some components apparently perform better when batched at input, and some at input. If you want to do “batch processing” then you’d want to batch in the
input
.- If
input
itself doesn’t support batching, we can usebroker
and make theinput
or multiple input support batching at input stage. broker
also works similarly foroutput
- If
- It’s little confusing, some components apparently perform better when batched at input, and some at input. If you want to do “batch processing” then you’d want to batch in the
-
Creating and breaking batches
- A batch policy(input/output having
batching
attribute) has the capability to create batches, but not to break them down.- This also takes in
processor
incase, you want to pre-process before sending out the batch.
- This also takes in
split
breaks batches
- A batch policy(input/output having
Processor (array of processors in input/pipeline/output
section)
proccessor
are not just limited topipeline
section.- Native way of doing processing/manipulating shit is via
bloblang
(viamapping
ormutation
)- NOTE: The
mapping
processor previously was called thebloblang
processor(now deprecated)
- NOTE: The
-
Data Enrichment
-
Branching
-
Workflow
-
Caching
There’s
cache
and then there’scached
-
cache
-
cached
-
-
-
Batching & buffer
-
Basics
buffer
is how we handlebatching
buffer
can be: window buffer,batch
buffer- Using
buffer
might alter the delivery guarantees etc. might drop things. be cautious.
-
Transactions and Buffering
input -> processing -> output
is a transaction (an ACK is sent to the input after output finishes)- But when we use buffering, the semantics of this “transaction” is changed.
-
Pure Batching
- config needed:
count, size, period
- config needed:
-
Windowing / Window Processing
Tumbling window
: No overlapSliding window
: Overlap- config needed:
system_window
&group_by_value
in the processor - This seems like but is NOT
stateful stream processing
, benthos is stateless. (debatable)
-
Rate limiting
See Rate Limiting
-
Exception handling
- Works w
processor
andoutput
, not winput
??? (DOUBT)retry
anddrop
work for bothprocessor
andoutput
reject
anddlq
work only foroutput
-
Exception/Error creation
- an exception can be triggered by whatever happens when processing
- we can however have some control over how to create custom errors using blobl, otherwise if the processor itself errors out that works too.
-
Catching / Try / Drop
try
(abandon) : If any of the processor in sequence fails, “skip” rest of it.retry
(unlimited attempts)- This is similar to
try
as in like a blanket processor. Buttry
simply just takes in alist of processor
, butretry
takes in some more params along withlist of processors
- This is similar to
catch
(handle): catch the message/event w error and handle it somehow.catch
can be emulated withswitch:check:errored()
(this is desirable if you want to maintain the error flag after success)- This is the
catch
processor, which is different from thecatch
function used inbloblang
. catch
can then send off theerror()
tolog
or remap thenew document
with the error message etc.
-
Dropping message
mapping
+errored()
+deleted()
: Dropping at processing stage. I personally would suggest dropping at output stage.reject_errored
: (Dropping atoutput
stage)- rejects messages that have failed their “processing” steps
- If
input
supportsNACK/NAK
, it’ll be sent a NAK at protocol level, and that message will NOT be re-processed. - Successful messages/events are sent to the child output
- Otherwise they’ll simply be reprocessed unless we use
fallback
and setup a DLQ mechanism.
- If
- rejects messages that have failed their “processing” steps
switch
+errored()
+error()
: Maximum flexibility while handling how to route failed/errored messages.broker
can also be used to route messages accordingly.
-
Fallback(eg.DLQ) and Batching
- Redpanda Connect makes a best attempt at inferring which specific messages of the batch failed, and only propagates those individual messages to the next fallback tier.
- However, depending on the output and the error returned it is sometimes not possible to determine the individual messages that failed, in which case the whole batch is passed to the next tier in order to preserve at-least-once delivery guarantees.
Development Tips 🌟
-
Test script
input: stdin: scanner: lines: {} output: stdout: {}
**
Bloblang
- 3 ways to use bloblang in
pipeline
: - Most common usecase is via
mapping
. Some of the usecases ofmapping
overlaps with use ofjq
processor and so on.
- Works in
processors
(proper) and inoutput
(via interpolation only)
- Processor
processor:bloblang
processorr:mapping
processor:branch:request_map/result_map
processor:<other processors>
, eg.switch
processor has acheck
attribute which takes in a blobl query.- Output (via interpolation)
- additionally, can evaluate using:
rpk connect blobl
/server
-
Interpolation & Coalecing
- Env var:
{ENV_VAR_NAME}
- Interpolation of blobl:
${!<bloblang expression>}
- Coalecing: root.contents = this.thing.(article | comment | share).contents | “nothing”
- Env var:
-
Variables
- Using
let
& referenced using$
- Metadta access via interpolation or via
@
insidemapping
- Log all metadata
- log: level: INFO fields_mapping: |- root.metadata = meta()
- Using
-
Conditionals
- all conditionals are expressions
if
: Can be astatement
and can also be anassignemnt
// this works root.pet.treats = if this.pet.is_cute { this.pet.treats + 10 } else { deleted() } // this also works if this.pet.is_cute { root.pet.treats = this.pet.treats + 10 root.pet.toys = this.pet.toys + 10 }
match
: Onlyassignment
- Also accepts a parameter which alters the
this
context.
- Also accepts a parameter which alters the
-
Error handling & creation
- It seems like error handling is only supported in
pipeline
block and not ininput
block processors. (have to confirm) - Use of
catch
- Error handling of
redpanda connect/benthos
is different from blobl error handling. These consepts exists at different levels of the stack. - We create error either via
throw
or via using helper functions which help in validating and coercing fields(eg.number, not_null, not_empty
etc.)
- It seems like error handling is only supported in
-
TODO Functions vs Methods
- Methods: Specific to datatypes/structured inputs etc. Eg. string.split etc
- Functions: Independent functions
-
Custom Methods
- We create
custom methods
by usingmap
keyword.this
insidemap
refers to the parameter ofapply
root
insidemap
refers to new value being created for each invocation of the map
- Can be applied to single values or to the entire object recursively.
// use map for a specific case map parse_talking_head { let split_string = this.split(":") root.id = $split_string.index(0) root.opinions = $split_string.index(1).split(",") } root = this.talking_heads.map_each(raw -> raw.apply("parse_talking_head")) // apply for the whole document recursively map remove_naughty_man { root = match { this.type() == "object" => this.map_each(item -> item.value.apply("remove_naughty_man")), this.type() == "array" => this.map_each(ele -> ele.apply("remove_naughty_man")), this.type() == "string" => if this.lowercase().contains("voldemort") { deleted() }, this.type() == "bytes" => if this.lowercase().contains("voldemort") { deleted() }, _ => this, } } root = this.apply("remove_naughty_man")
- We create
-
this
& named contextthis
points to the “context”- Usually it points to the
input document
, but based on usage it can point to other things, eg. inside amatch
block,match this.pet
now inside thematch
block,this
will point to the document inside the outerthis.pet
, i.e reducing boilerplate inside thematch
block. - Instead of
this
, we can also use named context with<context name> -> <blobl query>
-
this
androot
&mapping
mapping
“most of the time” refers to mappingJSON(structured) documents
(Bothnew
andinput
documents)mapping
purpose: construct a brandnew document
by using aninput document as a reference
- Uses
assignments
, everything is an assignment, even deletion!
- Uses
- components
root
: referring to the root of thenew document
this
: referring to the root of theinput document
- In a
mapping
, on the LHS, if you omitroot
, it’ll still be root of the new document. i.eroot.foo
is same asfoo
on lhs.
-
Non JSON documents
this
(“this” is JSON specific)this
keyword mandates that the current in-flight message is in JSON format- You should first ensure that your message is valid JSON by using
log
processor (if it’s not, you can use thecontent()
function in amapping
processor to extract the message data and parse it somehow)
content()
- Can be used in mapping the input as a binary string.
Eg. root = content().uppercase() vs root = this
, result is similar butthis
is json specific but withcontent
we’re reading with raw data.
- Can be used in mapping the input as a binary string.
Other notes
- @ vs meta
- root.poop = meta(“presigned_url”) # string
- root.doop = @presigned_url # binary
-
WASM
- I recommend using the redpanda_data_transform processor in connect. It’s based on WebAssembly and rpk can generate the boilerplate for you
- follow this tutorial: https://docs.redpanda.com/current/develop/data-transforms/run-transforms/
- But instead of using rpk transform deploy,
- point the built wasm module using https://docs.redpanda.com/redpanda-connect/components/processors/redpanda_data_transform/ that module_path property is a path to the wasm file from rpk transform build
- docs.redpanda.com
- Data Transforms in Linux Quickstart | Redpanda Self-Managed
- Learn how to build and deploy your first transform function in Linux deployments.
things ppl say
- We use debezium to stream CDC from our DB replica to Kafka.
- We have various services reading the CDC data from Kafka for processing