10k

System Design Interview and beyond Note5 - Queues

The importance of queues in distributed system

Queue

Bounded and unbounded queues

Circular buffer and it's applications

  1. Message can be stored in disk and memory
  2. Use queue: FIFO
  3. unbounded queue: use as much memory as it can
  4. Bounded queue: limited size queue, the size is specified at creation.
  5. Implementation: data structure support ordering, insert and delete in constant time;
    1. Linklist : a pointer on the tail and head;
    2. Arrat : add is easy, but remove will shift all the elements ahead, so the operation is O(n)
  6. In the array case, the deletion is slow, so we can think of some way to avoid the shifts->(space exchange time)->circular buffer(ring buffer)
  7. Circular buffer maintains two pointers, read and write, (which specify next node to be read and write); when they reach the end of the array, they are set to 0(move as circle)
  8. it's full when the head and tail points to the same node. -> we can start overwriting the oldest data or throw an exception.
  9. Application of ring buffers: -> to ensure hard bound for memory utilization
  10. Another application is logging -> write a log entry to disk on every call to Logger is slow -> right the log in ring buffer in memory and sync the data into disk in background periodically.
  11. Another example : Hadoop MapReduce : for the input record, new KV pairs are generated, writing them to HDFS will cause additional replication and IO and may be slow -> so we write the k/v s into a ring buffer and when the buffer is full-> Spilling(copy data from memory buffer to disk)

Full and empty queue problems

Load shedding

Rate limiting

What to do with failed requests

Backpression

Elastic scaling

  1. Two corner cases for bounded queue : full(arrival rate > retrieve rate) and empty (arrive rate < retrieve rate) queue.
  2. Full queue

    1. Drop incoming message -> load shedding , rate limiting
    2. Force producer to slow down -> back pressure
    3. Scale the consumer up to consume fast -> elastic scaling
  3. Load shedding

    1. For single machine, we the queue is full, the write to queue process start drop the message(with/without tell the producer)

    2. In distributed system, the write process in the single machine becomes a monitoring system, it monitors the CPU load/memory utilization/average response latency..., when the CPU utilization is too high(for instance), machines in the cluster will start drop the messages

      image-20240303095750165

  4. Rate Limiting: every producer gets a quote limiting how many messages this producer can send to the broker.

    1. Single machine: count the # of request in memory

    2. Distributed system : external cache saves the rate limiting info.

      image-20240303095954574

  5. Broker should tell producer he is busy, the broker can:

    1. Ignore and do nothing
    2. Buffer messages(in memory or disk) and send later;
    3. Propagate the exception further up the stack (sync call in a chain)
    4. Send messages over the limit to a temp storage(another broker or system);
    5. Retry immediately -> bad -> cause retry storm
  6. If producer not know how to handle this situation (full), then broker can tell the producer to slow down -> Backpressure.

    1. Single machine : limit the number of processing messages from producer; when all threads are in woking and blocked, the producer cannot send more messages thus back pressured
    2. Distributed system : load balancer will load balance, but when all full, LB will tell the client that they cannot process more messages -> producer can buffer the message, propagate the info to machines in the cluster, fall back to different system, drop message
  7. Elastic scaling

    1. Single: by introducing a dynamic pool -> Semaphores can help inform
    2. Distributed: dynamically add more consumers -> competing consumer pattern
  8. Empty queue:

    1. How to read element from queue: scheduled job or for loop -> but when the queen is empty the will keep listening the queue(even if they are empty)
    2. Configure the loop or schedule job to less often -> help but new problem -> messaging processing latency increase
  9. Broker push message when available; -> Websocket

  10. Broker block the pull request and waits for messages -> long polling

Start with something simple

Similarities between single machine and distributed system concept

Interview tips

  1. When you don't know where to start, start from simple ( single machine model)
  2. Start with outlining key components of the solution, thinking in terms of classed, libs and threads -> this will give you clue which components can be converted to standalone service in distributed world.
  3. Bottom up

Blocking queue and producer-consumer patterns

Producer-consumer pattern

Wait and notify pattern

Semaphores

Blocking queue applications

  1. Blocking queue is a queue that provides blocking put and take methods ; when the queue is full it block put operation and when it's empty it blocks the take operation.

  2. Blocking queue can help implement backpressure, long polling , and producer-consumer pattern .

  3. Producer-consumer patterns can be implement without blocking queues but it requires concurrent programming techniques.

    1. Wait and notify, wait pause the execution of a thread while notify wakes up a waiting thread.
    2. Queue is full, producer calls wait, meanwhile consumer retrieves message from queue; can notify consumer thread
    3. and when it's empty, consumer calls wait and calls notify when there is a signal from producer thread;
  4. Or use semaphores (like a counter) to implement the producer consumers

    1. What: hold a set of permits , each thread can acquire a permit. If available, reduce one, when the thread release the permit, it increate by one;
    2. To implement the producer - consumer pattern: two permit. Record the occupied spots and empty spots.
      1. When a new element is added to the queue, occupied plus one and empty minus one.
  5. Application: Notification system,When we have a set of ready to send messages, each message will be sent to one or more subscribers.

    image-20240303110536648

  6. Data aggregation service (when we have a stream of events) to count(views in Youtube): aggregate data in memory and sent to a blocking queue.

    image-20240303110908393

  7. Thread pool

Thread pool

Pros and cons of thread pool

CPU-bound and I/O bound tasks

Graceful shutdown

  1. Thread per request : Process request concurrently -> Increase throughput
  2. create and stop thread are expensive, when too many request comes, -> OOM
  3. Even if OOM not happen, CPU may be exhausted -> long running task occupy CPU and some other thread has no CPU time -> thread starvation
  4. We need manage thread: reuse thread and make sure certain number of thread existing
  5. Thread pool:
    1. Increasing performance -> decreasing latency -> fixed number of thread, not too much create and terminate
    2. Make application stable and predictable
    3. Simplify coding(don't not to manage thread lifecycle and focus task )
    4. Define the size of the thread pool is hard(small no use, large OOM/CPU starvation)
    5. And long running tasks clog the thread pool -> managed by timeouts(failed or re-queue)
  6. Sizing:
    1. CPU bound: size = CPU count + 1;(most of the time no all CPU will be used)
    2. I/O bound : size = CPU count * (1 + wait time/service time);
    3. In actually : run a load test and see the CPU utilization
  7. Graceful shutdown:
    1. Don't allow new thread/message
    2. Running task are waited to be completed
    3. (Max wait time are specified to avoid long waiting)
    4. Cancel the remaining tasks(ensure no task are lost)
  8. The graceful shutdown algorithm can be applied to many things. Stop receiving new request/task and allow existing jobs finish.

Big compute architecture

Bath computing model

Embarrassingly parallel problems

  1. Thread pool pattern can be mapped to large scale -> VM pools to perform parallel computation across thousands CPUs.
  2. This model works well when tasks are homogeneous(they an alike), arrive at the almost the same rate(stream of tasks), independent and small task ;
  3. But when tasks are heterogeneous ( some tasks consume much more resources), task a coupled or represents batch jobs or task takes long time to run-> batch computing
  4. Batch computing normally has a job scheduler / definer , send the job to the coordinator, then the coordinator send the job to different VM pools. -> AWS batch /Azure Batch services.
  5. Big Compute: High performance computing(HPC): -> to salve parallel problems ->
    1. large volume data processing-> e.g. trading transaction, clickstream events application log files -> analysis that we don't need it to process in realtime
    2. Build thousands of ML models: e.g. Netflix trains personalize page for each customer .
    3. Perform computationally intensive operations: e.g. weather forecasting, climate simulation, drop discovery, analysis of genomic sequence.
  6. MapReduce vs Big Compute
Thoughts? Leave a comment