10k

System Design Interview and beyond 10 - Deliver Data at large scale

Partition

How to scale message consumption

Single consumer vs multiple consumers

Problems with multiple consumers(order of message processing, double processing)

  1. Scale horizontally: more machine or vertically better machine

  2. Single consumer: message order is preserved but unreliable(restart, network issue...) ;

  3. Multiplen consumers : high availability ; but messages can be processed out of order; and higher change of processing the same messages multiple time.

  4. Out of order issue demonstration: push model:

    1. I have a queue and two consumers 1 and 2 messages are dispatched in a round robin manner ;

    2. I have three messages A, B and C. A to consumer 1, B to consumer 2....

    3. One day the message A is processed in the consumer but slower than B is processed, while A is a create booking , B is cancel booking

    4. Here we cancel first and then create, resulting in booking is still not canceled.

      image-20240324092450273

  5. Double processing : pull based

    1. I have a queue and two consumers 1 and 2; I have three messages A, B and C.

    2. Consumer 1 pulled A and consumer 2 pulled consumer B; messages are marked invisible

    3. However consumer 1 processed message A but failed to send the ack to the queue;

    4. So after the visibility timeout, A become available again and pulled by consumer 2;

    5. Same example for message A and B, A is booking create and B is cancel. Here consumer A create a booing, B cancel a booking

    6. But consumer 2 proceeded A again so the booking created again.

      image-20240324092932241 image-20240324092948190

  6. One solution is to use a deduplication cache shared by all consumers,

  7. One more example multiple consumer, log-based messaging :

    1. Each consumer manages its now offset and messages are not immediately deleted -> no competing one a singe queue
  8. Solution: data partitioning (sharding): multiple queues for each consumer

    1. The order of messages among queues is trivial but the messages order in a queue is important and should be ordered in some manner (for a single user)

Partitioning in real life systems

Pros and cons

Applications

  1. Pros and cons

    1. High scalability (add shard when data grows )
    2. Performance (each shard can parallel process )
    3. Availability (one shard failed, others continue serve)
  2. Applications

    1. Messaging system - RabbitMQ - shared queues

      Producer send messages to a single queue and then partitions to multiple regular queues, shards. Each shard has a single consumer. Total order is lose, but order is guaranteed in shard level

      image-20240324095018824

    2. Log-based messaging system Kafka, Kinesis

      Divide incoming messages into partitions , each partition is processed by its own consumer. -> no competing consumer -> so you want to parallelize message processing we create more partitions.

      image-20240324095229572

    3. Database

      Each shard(also called node) holds a bunch of data. -> new issues raise

      • Which node should we write data
      • How to pick a shard when reading data
      • How to ensure data is evenly distributed among the nodes;
      • What is evenly distributed ? -> same volume of data or same read and write requests.

      image-20240324095520219

      1. How to partition data

        1. Configuration service: how many shards and which data in which shard -> client requests comes to request router and router check configuration service to check which shard is to use. -> SQL and NoSQL database

        2. Avoid additional components, allow nodes to communicate to each other on how to split data and route request -> Dynamo and Cassandra

          image-20240324100111309

    4. Shard can be used a distributed cache system

      1. Database sahrd -> cache shard
      2. Store in mem
    5. Object storage -> S3, google cloud storage, Azure Blob ...

      1. When we upload file, one node is selected and store file there ;

      2. Metadata is stored in the configuration service .

      3. When get file, request router check the metadata service and forward the request to the node.

        image-20240324100351470

    Partition strategies

    Lookup Strategy

    Range strategy

    Hash strategy

    1. Lookup strategy : create a mapping ourselves , and assign a shard randomly to a user .

      1. Pro: control over shard assignment logic (randomly or based on shard utilization)
      2. Con: hard dependency on the mapping : for every read and write from users, we need to look up a shard that contains user's data.
      3. New issue arise: how make it highly available? Fast? Scalable ? Consistent? -> as the data grows, it's common to partitioning the lookup tables.image-20240324103059390
    2. Range Strategy: shard responsible for a range of continuous keys.

      1. Pros: easy to implement, works well with range queries.(orders in a month)

      2. Cons: provide suboptimal balancing between shards (which may lead to hot shard problem-> single shard get much higher load than others)

        image-20240324103652799

    3. Hash strategy: based on users last name, take the string and compute a hash number.

      1. Sharding boundaries: spacing them evenly or consistent hashing
      2. pro: helps distribute data more evenly across shards
      3. Cons: range query have poor per and compute the hash imposes an additional overhead.

      image-20240324104001086

Request routing

Physical and virtual shards

Request routing options

  1. Physical machine vs virtual shards : a physical hard can hold multiple virtual shards

  2. Request routing service : identify the shard that's stores the data for specified ket and forward the request to the physical machine that host the identified shard -> a mapping

  3. The mapping has the shard name and ip of shard live on. -> where to put the info?

  4. Option1 : static field deployed every machines

    1. Lack of flexibility , update (number of shard, IP, shard to IP ) will requires redeployment

      image-20240324105749216

  5. Option2: shared storage : S3 and daemon process pull the info from S3 periodically,

    1. More flexible compare to option1

    2. But increase complexity of client ,

    3. Daemon process is vital here otherwise the mapping may be come outdated and request forwarding is wrong

      image-20240324105948205

  6. No updating client -> proxy option and p2p option

    1. Proxy has a mapping , clients make a call to proxy and proxy make call to the shard
    2. P2p: client make a call to random node and node redirect request to the right node(node know each other)

    image-20240324110201016

  7. This mapping doesn't change frequently-> If we have small number of servers, we can share the list via DNS; if we have many server, we can pul LB between clients and servers.

  8. How proxy machine find shard server on the network or shard server find each others -> service discovery

    1. Static list of IP addresses of whole shard machine, put them to a small number of proxy machines (in case of proxy option) or shard machines (in case of P2P option); use a daemon process to fetch the list from a shard storage(S3); or shared through DNS
    2. Service registry (classic client-sided discovery mechanism)
    3. Gossip protocol

Rebalancing partitions

How

  1. Why rebalancing -> uneven load (number or requests ) on shard servers -> uneven data distribution/hot keys/share server failure(handover) -> Rebalancing

  2. Option1 : split when shard size exceed a limit -> auto or manually -> clone shards(while handle all request) and updates metadata

    image-20240324163133187

    • This is a just metadata change and doesn't cause immediate data migration between shard servers
    • Whether we need to redistribute the shards between servers is decided by another processor , the balancer.
    • It's a background process that monitors the number of shards on each server. If largest and smallest shards exceed a limit, balancer start to migrate to reach an equal number of shards per server.
    • Adding new server to cluster triggers the balancer to start sard migration process;
    • Removing server is the reverse process.
  3. Split shard when adding a new server.

    image-20240324164044798

  4. Shard owns a small range of hash keys , shard number fixed. ->fix number partition strategy

    1. Say we have 1024 shards, distributed to 64 shard server machine , shard will grow bigger but we can put it into some machine that has smaller shard
    2. When a new machine is added to a cluster , every machine move some shard to the new machine
    3. Simple to implement and maintain
    4. Hard to choose the initial number of shards ; another drawback is some shard grows really large
  5. image-20240324164952499

Consistent hashing

How to implement

Pros and cons

Virtual nodes

Application

  1. How to implement
    1. Boundaries for each shard (a range of hashes each shard owns )
    2. Where each shard lives(which physical server each shard lives on)
    3. How to rebalance shards(pick and implement a rebalancing strategy)
  2. Consistent hashing partitioning
    1. Hash function -> integer -> for each request -> we need it fast
    2. Cryptographic hash functions (SHA)(expensive) vs non Cryptographic hash(general, e.g. Murmurhash)
    3. How to split the range into smaller hash intervals and how to assign each interval to a server.
  3. Cosnsitent hashing -> circle, pick a random position as 0;
    1. Move clockwise one the circle
    2. Take a list of servers and calculate their hash values based on hostname or IP,
    3. Each server owns the has value between itself and it next one(clock wise)
      1. image-20240324172051274
  4. Easy to implement (simplicity and speed)
    1. List of server identifiers,
    2. Apply the hash function
    3. Get has range for each server
    4. Sort the list of hash ranges
    5. Share this ordered list with clients (request counting techniques)
    6. Using binary search, client can quickly find its server to send request to
    7. Client forwards the request to the server
  5. How to rebalancing in consistent hashing
    1. When adding or removing a single server only impact it's neighbor
      1. image-20240324172602048
    2. Data is stored on neighbors (replica) to avoid data loss
      1. image-20240324172828836
    3. Issue 1 : Domino effect : when a serve is down due to high load or something, its load moves to the next neighbor, and next one is down due to same reason and continue passing the huge loads make one by one down.
    4. Issue 2: servers don't split the circle evenly
  6. Two address these two issue -> virtual node
    1. image-20240324173140714
    2. Now each server are responsible for a set of range -> more evenly (although more memory to hold and find vnodes)
  7. Application
    1. Datavase (Cassandra, CouchBase, Riak, Voldemort)-> help evenly distribute data to shards and rebalancing and minimize data movements
    2. Distributed caches (client libraries, e.g. Ketama)
    3. CDN(Akamai)
    4. Network load balancers (Maglev)-> help LBs distribute connections among backend servers ; and when backend server is unavailable, only connection on this server will be reshuffled
    5. Chat applications (Discord) user -service
    6. Messaging systems (RabbitMQ)
  8. If data are short lived and doesn't need to move around for rebalancing -> basis partition-> simple mod is enough.
Thoughts? Leave a comment