tags : Task Queue

See NATS by Example

See Inter-System Messaging with NATS & Jetstream (with Jeremy Saenz) - YouTube

FAQ

Alternatives

NATS documentation also has a nice comparision page

  • RabbitMQ
  • Kafka / Redpanda
  • Redis Streams
    • Redis steams is something that you use when you want pub/sub to be persistent
    • But with NATS you get better ergonomics around this. Eg. better abstractions over acknowledging messages and then deleting them from queue, which redis stream expect the client to handle.

Concepts

NATS: Core and Jetstream Overview

NATS is a high-performance messaging system. It consists of NATS Core, providing basic messaging patterns, and NATS Jetstream, which builds upon Core NATS to add persistence, streaming, and more complex messaging capabilities.

NATS Core

NATS Core offers a simple, high-performance publish-subscribe messaging system.

  • Standard Messaging API: Provides at-most-once delivery by default. It’s very fast and suitable for scenarios where occasional message loss is acceptable or handled by the application.
  • Queue Groups:
    • NATS Core allows subscribers to form a queue group by subscribing to the same subject with the same queue name.
    • The NATS server will then distribute messages published to that subject to only one member of the queue group, chosen at random or round-robin.
    • Queue groups in Core NATS are dynamic and configured on the client-side without server-side stream/consumer definitions.
      • Subscription Syntax Example:
              nc.subscribe("subject.name", { queue: "myProcessingQueue", callback: ... })

NATS Jetstream

Jetstream is a persistence layer built into NATS, offering streaming, message replay, and at-least-once delivery semantics. It’s designed for scenarios requiring higher durability and reliability.

  • Builds on Top of Core: Jetstream leverages the performance and simplicity of Core NATS while adding powerful features.
  • Key Concepts:
    • Streams: Persistent storage for messages. Messages are appended to a stream.
      • Creation: Streams are typically defined and created by applications at initialization if they don’t already exist.
        • Example Command:
                  nats stream add jobs \
                    --subjects "jobs.*.*" \
                    --storage=file \
                    --replicas=1 \
                    --retention=work \
                    --max-age="7d" \
                    --discard=new \
                    --max-msgs=50000 \
                    --max-bytes=500MB \
                    --ack \
                    --description="stream for background job processing"
      • Retention Policies: Define how long messages are kept (e.g., limits based on age/count/size, interest based on consumer activity, work queue semantics where messages are removed after acknowledgement).
      • Discard Policies: Determine what happens when a stream reaches its limits (e.g., new discards new messages if the limit is hit, old discards old messages). For the example nats stream add jobs command, discard=new means once limits are hit, new messages won’t be accepted, and old ones won’t be removed to make space, protecting existing unprocessed jobs.
      • Limits: Streams can have limits on the number of messages, total size, message age, etc.
      • Replay: Messages can be re-read multiple times without being removed from the stream (unless it’s a WorkQueue policy and the message is acknowledged).
    • Consumers: Stateful views into a stream, allowing applications to read and process messages.
      • Consumers are always attached to a stream and can filter messages from that stream by subject.
      • They maintain their own state regarding message delivery and acknowledgement, enabling features like message replay from a specific point.
      • Jetstream supports both push (server pushes messages to client) and pull (client requests messages) consumers.

Messaging Patterns and Use Cases with Jetstream

1. Work/Task Queues (with Jetstream)

Ideal for distributing tasks among a pool of workers, ensuring that each task is processed at least once.

  • Implementation:
    1. Define a Stream: Create a Jetstream stream with a WorkQueue retention policy. This means messages are removed from the stream once successfully acknowledged by a consumer.
    2. Write Workers: Develop worker applications that subscribe to this stream using a consumer.
    3. Job Status: Application logic can maintain job status in an external database if needed (e.g., pending, processing, completed, failed).
  • Dead-Letter Queue (DLQ) Implementation:
    • Mechanism: When a message repeatedly fails processing and reaches its maximum delivery attempts (configured on the consumer), Jetstream doesn’t automatically move it to a DLQ. Instead, it publishes an advisory message.
    • Advisory Subject: This advisory is published on a specific subject, typically: $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER>.
    • Advisory Content: The advisory message contains metadata about the failed message, including the original stream name and the stream_seq (sequence number of the failed message in the original stream).
    • Creating a DLQ Stream:
      1. Create a separate Jetstream stream (e.g., jobs_dlq) that subscribes to the advisory subject pattern (e.g., $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.jobs.*).
      2. This jobs_dlq stream will then store these advisory messages.
    • Handling Failed Messages:
      • The original failed message remains in the jobs stream until it’s manually deleted, acknowledged (e.g., by a special DLQ handler that decides to ack it after investigation), or expires due to stream retention policies (like max-age).
      • A separate process (or a dedicated consumer on the DLQ stream) can monitor the DLQ stream.
      • When a message appears in the DLQ stream (i.e., an advisory is received), this monitor can use the stream_seq from the advisory to look up the actual data of the failed message from the original jobs stream for inspection, manual retry, or archival.
    • Stream Configuration for DLQ Behavior: The --max-age (e.g., “7d”) combined with the WorkQueue retention policy on the primary jobs stream helps in eventually cleaning up messages that were processed or have been in the stream for too long, even if they ended up in a conceptual DLQ.

2. Publish/Subscribe (Pub/Sub) & Fanout

Jetstream enhances NATS Core pub/sub by adding persistence.

Implementation: A stream can capture all messages published to a subject (or a set of wildcard subjects). Multiple consumers, potentially with different durable names or as ephemeral consumers, can subscribe to this stream. Each consumer will receive all messages from the stream according to its filter and starting position. This is useful for fanning out events to various microservices.

3. Key-Value (KV) Store

Jetstream provides a built-in Key-Value store.

  • Functionality: Offers a simple, distributed KV store built on top of streams. It can be a lightweight alternative to systems like Redis for certain use cases, such as configuration management or session storage.
  • Each key is a subject, and the stream stores the latest value for each key, with options for history.

4. Persistent Log Stream

Jetstream can be used to create persistent, append-only logs, similar in concept to Kafka topics.

  • Implementation: Create a stream with appropriate retention policies (e.g., keep messages for a long duration or up to a certain size). Applications publish log events to this stream. Different consumers can then process these logs for monitoring, auditing, or analytics.

Understanding Consumers in NATS Jetstream

A view into the stream

Consumers are fundamental to how data is accessed and processed from Jetstream streams.

  • Types of Consumers:
    • Ephemeral Consumers:
      • These consumers are temporary and are automatically deleted when the subscribing application disconnects.
      • They do not have a persistent state across application restarts.
    • Durable Consumers:
      • Named: Identified by a unique name within the stream.
      • Stateful: Jetstream persists the state of durable consumers, including which messages they have acknowledged.
      • Resilient: When an application instance subscribes using an existing durable name, it picks up processing from where the “group” (or the last instance using that durable name) left off.
      • Independent Existence: A durable consumer’s configuration and state exist on the server even if no application instances are currently connected to it. Clients attach to the latest state of the consumer.
  • Why Durable Consumers are Needed (and their relation to persisted stream vs. persisted consumer state):
    • While the stream persists the messages, the durable consumer persists the processing state for a particular logical consumer or group of consuming instances.
    • This persisted consumer state (e.g., which messages have been acknowledged, which are currently inflight) is crucial for:
      • Fault Tolerance: If a worker instance crashes, another can take over using the same durable name and resume from the last acknowledged message, preventing data loss or reprocessing (for WorkQueue scenarios).
      • Scaling: Multiple worker instances can subscribe using the same durable consumer name. Jetstream will then distribute messages from the stream among these instances, effectively creating a load-balanced queue group. Each instance processes a subset of the messages.
      • Decoupling: The consumer’s existence is decoupled from the worker processes. You can define consumers even before workers are deployed.
  • Scaling Workers with Durable Consumers

    • Multiple Worker Instances (Separate Processes, Same Durable Consumer Name):
      • This is the primary and idiomatic way to scale Jetstream workers for task processing.
      • Each worker instance subscribes to the stream using the same durable consumer name.
      • Jetstream load balances messages from the stream across these worker instances.
      • This forms an implicit queue group behavior, ensuring each message is processed by only one worker in the group.
    • Concurrency Within a Single Worker Instance:
      • A single worker can process multiple messages concurrently using asynchronous operations (e.g., async/await in TypeScript with Promise.all).
      • The consumer’s MaxAckPending setting controls how many messages the server will send to the client without receiving an acknowledgment. The worker can fetch up to MaxAckPending messages and process them concurrently.
    • Multiple js.subscribe() Calls in the Same Process:
      • Scenario A: Different Durable Names (or Ephemeral Consumers):
        • Use Case: If you want different logical processing paths for the same message within the same worker instance, or if one part of the application needs to see all messages for one purpose, and another part for a different purpose, starting independently. This is less common for simple job processing where a single pool of workers shares a job queue.
      • Scenario B: Same Durable Name:
        • Caution: Generally not recommended for scaling within a single process. The purpose of a shared durable consumer is typically to distribute work across different application instances/processes. Using the same durable name for multiple subscriptions within the same process can lead to unpredictable message distribution to those internal subscriptions and might not achieve the intended concurrency or separation as effectively as handling concurrency within a single subscription’s message handler (using MaxAckPending and async processing).

Relationship Between Queue Groups (Core NATS) and Jetstream Durable Consumers

Different prespectives of groups

  • Core NATS Queue Groups:
    • Client-side configuration where multiple subscribers use the same subject and queue name.
    • The server distributes messages among them for that specific subject.
    • No server-side persisted state for the queue group itself (beyond the subscriptions).
  • Jetstream Durable Consumers (Implicit Queue Groups):
    • When multiple application instances subscribe using the same durable consumer name to a Jetstream stream, they form a group that processes messages from that stream in a queue-like fashion.
    • This is a server-side concept where the durable consumer’s state (acks, pending messages) is managed by Jetstream, providing robust, load-balanced processing. This is the most common way to achieve queueing with Jetstream.
  • Jetstream deliver_group (for Push Consumers):
    • A Jetstream push consumer can be configured with a deliver_group name.
    • When set, the Jetstream consumer pushes messages to its deliver_subject, and if your applications subscribe to this deliver_subject using Core NATS queue group semantics (i.e., same queue name in their subscription options), they will form a Core NATS queue group for messages delivered by that specific Jetstream push consumer.
    • This is a more advanced pattern, often used for bridging Jetstream to existing Core NATS queue subscribers or for specific fan-out/fan-in scenarios after initial stream processing. For most worker scaling, directly using shared durable consumers is simpler.

The term “consumer group” or “group” in the context of Jetstream typically refers to the collection of worker instances that are all using the same durable consumer name, thereby sharing the load of messages from a stream. “Delivery group” usually refers to the deliver_group option on a Jetstream consumer, which ties into Core NATS queue groups.

Purpose of groups

  • Loadbalancing: scaling message processing.
  • High availability (HA): Members can cleanly disconnect and reconnect or new members can join without interrupting the processing

In coreNATS subscriptions, why would you not always use queue groups?

  • The main reason is that concurrent message processing gives up message order.
  • For eventing/streaming use cases, messages may need to be processed in the order they were published.
  • If using Jetstream, we can enforce this by setting MaxAckPending=1

Key Considerations

  • Idempotency: Ensure your message handlers are idempotent, especially with at-least-once delivery, to prevent unintended side effects if a message is processed more than once (e.g., due to retries before acknowledgement).
  • Message Design: Design clear and versioned message schemas.
  • Error Handling: Implement robust error handling and retry logic within workers. Use DLQs for messages that consistently fail.
  • Monitoring: Monitor stream depths, consumer lag, and processing rates. NATS provides monitoring endpoints.

Example Usage

NodeJS

Basic Worker

// Inside your main worker loop (simplified)
// const messages = await consumer.consume(); // From previous example
 
(async () => {
  for await (const m of messages) {
    console.log(`Received msg seq ${m.seq}`);
 
    // Call the handler, but DON'T await it here.
    // This allows the loop to continue and fetch/process the next message
    // while this one is being handled asynchronously.
    handleIncomingMessage(m) // handleIncomingMessage is an async function
      .then(() => {
        // Optional: Log success at this level if not done within handleIncomingMessage
        // console.log(`Successfully processed and acked ${m.seq}`);
      })
      .catch(async (err) => {
        // CRITICAL: You MUST handle errors from the un-awaited promise.
        // Otherwise, an unhandled rejection here can crash your worker.
        console.error(`Error processing message ${m.seq} in background:`, err);
        // Decide what to do with the message 'm' based on the error.
        // It might still be "in flight" if handleIncomingMessage didn't ack/nak/term it.
        // If the error means the message object 'm' itself is now invalid or
        // if handleIncomingMessage is guaranteed to have already acked/nacked/termed,
        // you might not need to do anything with 'm' here.
        // However, if handleIncomingMessage could throw *before* interacting with 'm',
        // you might need to m.nak() or m.term() here as a fallback.
        // This depends on the robustness of handleIncomingMessage's own error handling.
        // A common pattern is for handleIncomingMessage to be responsible for ack/nak/term.
        if (!m.isAcknowledged()) { // Check if it wasn't already handled
             try {
                // Example: Nak it for redelivery after a delay
                console.warn(`Nak'ing message ${m.seq} due to unhandled error in processing.`);
                m.nak(5000); // Redeliver after 5 seconds
             } catch (nakErr) {
                console.error(`Failed to NAK message ${m.seq}:`, nakErr);
             }
        }
      });
  }
})();

Following approach is typically used when you have a known batch of asynchronous tasks you want to execute concurrently and wait for all (or some) of them to complete before proceeding with a subsequent step. It’s less directly applicable to the top-level message consumption loop from NATS (which is a continuous stream) but is very useful inside your handleIncomingMessage function if a single job involves multiple independent async sub-tasks.

async function handleComplexTask(payload: any, msgAckControls: { ack: () => void; nak: () => void }) {
  try {
    console.log("Starting complex task. Fetching data from two sources...");
    const [userData, productData] = await Promise.all([
      fetchUserData(payload.userId),   // async function
      fetchProductInfo(payload.productId) // async function
    ]);
 
    console.log("Data fetched. Processing...", userData, productData);
    await processCombinedData(userData, productData); // async function
 
    console.log("Complex task complete.");
    msgAckControls.ack();
  } catch (error) {
    console.error("Error in complex task:", error);
    msgAckControls.nak();
  }
}