Corfu: The Protocol

by simbo1905

This is the eighth and last in a series of posts about Corfu a strongly consistent and scalable distributed log. To quote the paper:

At its base, CORFU derives its success by scaling Paxos to data center settings and providing a durable engine for totally ordering client requests at tens of Gigabits per second throughput.

It does this through a number of existing techniques:

Our design borrows bits and pieces from various existing methods, yet we invented a new solution in order to scale SMR [state machine replication] to larger cluster scales and deliver aggregate cluster throughput to hundreds of clients.

What I really like about Corfu is that it innovates by bringing existing techniques such as Paxos together into something which is a real breakthrough. The protocol itself is very simple. The established techniques it uses to implement the protocol are very simple. Yet when assembled into the solution they become extremely powerful. This post will put the pieces together and discuss the Corfu protocol.

The key characteristics of Corfu is that it has a client-centric architecture:

Our design places most CORFU functionality at the clients, which reduces the complexity, cost, latency, and power consumption of the storage units.

Clients are typically application servers using the Corfu system as a shared global log. Moving the logic into the clients is a paradigm shift away from typical architectures for strong consistency. Algorithms such as Paxos, Raft or ZAB have a cluster of nodes running their own network protocol. In contrast in Corfu the cluster of storage nodes don’t exchange any messages at all. They are remarkably dumb.

That isn’t quite the full story through. If we get some sort of split brain situation where different clients are using different configurations the Corfu distributed log will be trashed. To solve this Corfu uses Paxos to ensure that the cluster configuration is made strongly consistent. Yet that is moving around meta-data which is slowly changing and that network protocol runs out-of-band to the main transactional workload.

To get a flavour of how little logic is used to run the main transational load we can cover the full storage node logic in a few paragraphs. The state maintained by the storage units is:

  • An “epoch” number. This is an integer corresponding to the cluster configuration version which is valid to interact with the storage unit. It is managed by clients running Paxos out-of-band to the main workload. More on that below.
  • A mapping of logical log pages onto physical disk pages. This is described as a hash map within the paper.

Copy compaction (Part 5) as well as stripe and mirror (Part 4) means that the mapping of logical pages onto physical disk pages is arbitrary. A hash map can hold any arbitrary maping with an O(1) lookup.  That along with a big SSD disk holding the main data is exposed in the following simple network API:

  • read(epoch,address) valid the epoch of the client matches the local epoch number. Upon failure return err_sealed. Upon success run the address through the map to return the physical page of data.
  • write(epoch,address,value) validate the epoch of the client matches the local epoch.  Upon failure return err_sealed. Upon success run the address through the map to get the physical page address. Check the write-once flag and deleted flag. Upon failure returnerr_writtenor err_deleted. Upon success write the value into the physical address.
  • delete(address) mark the address as deleted. This doesn’t validate the epoch which looks risky to me.
  • seal(epoch) this is used during a reconfiguration. It first validates the epoch being set is higher than the current local epoch number.  Upon failure it returns err_sealed. Upon success it sets the local epoch to be the one supplied. It then returns the highest page number written via any command. All the actual logic of how to do a reconfiguration runs in a client which we will cover below.

We can see that the epoch is a guard to prevent clients from reading and writing data using an out of date configuration. Other than that the storage node just uses its internal hashmap to resolve a logical client address to a local disk page. We only need 1 bit to know if a page is written and another bit to know if the page has been deleted. Easy peasy.

This leaves the main logic in the client. We have seen that clients interact with a global sequencer (Part 3) to append to the log. The client API is then actually reasonably straight forward:

  • append(b) appends a value b to the global log getting back the log position l. The client driver code will first go to the global sequencer to obtain the next 64 bit number. This will be returned if the write is successful. The client driver then uses the versioned cluster configuration to translate this logical log position to a page offset on a replica set. The write messages sent to the replicas include the configuration version so that storage nodes can reject writes from clients using an out of date configuration. Messages are sent to each storage node in the replica set in a strict ordering. Upon receipt of a err_written the driver can loop to obtain a new position from the global sequencer and reattempt the write. Upon receipt of an err_sealed the client needs to reload the cluster configuration to know the correct configuration assocated with the next epoch number.
  • read(l) read a page of data at a given position l. The client driver uses the versioned cluster configuration to translate this logical log position to a page offset on a replica set. The read message is sent to the last replica in the strict write ordering. The read message includes the configuration version so that the storage node can reject reads from clients using an out of date configuration. If the client driver receives an err_unwritten it can check the first replica and copy any value found to the remaining replicas. If no value is found the error is returned. Upon receipt of an err_sealed the client needs to reload the cluster configuration.
  • fill(l) fills a position in the log with a noop value. This is used by the application when it suspects another client has crashed during an append operation having obtained the next number from the global sequencer but before successfully writing any value. The majority of the client code can be shared with the append functionality skipping the trip to the sequencer.
  • trim(l) deletes the value at a given position. The client logic will be similar to the fill functionality.
  • reconfig(changes) applies a cluster reconfiguration using a blackbox Paxos engine which is discussed below.

Append lets the sequencer pick the location and writes a real value. Fill write a noop value to a location. Delete sets the delete bit on a location. The cluster configuration maps a location onto a replica set using a strip and mirror technique. Simple stuff. Very cool.

The last thing to consider are cluster reconfigurations. The canonical reason to do this is that we might have a storage unit crash. This is described in the following pseudocode taken from the paper:

Operation RECONFIG(changes);
send (seal, c.epoch) request to storage servers
wait for replies from at least one replica in each extent
compute new projection newP based on changes and (sealed,*) replies
using any black-box consensus-engine:
propose newP and recieve decision
set P to decision
increment c.epoch

Note that the client seals the storage units before computing a new projection of logical ranges to the storage unit. It then runs that computed configuration through a black-box consensus engine (they use something “Paxos-like” in the paper). Why? Well its not clear to any arbitrary client is the last written position during a crash or network partition. The client sealing the unit needs to learn the last written position from the responses to the seal command. It can create a new extent of logical log starting at the next position mapped onto a healthy replica set. This means that reconfiguration leads to a short stop-the-world event to recover from a crash.

What about the data already written to a replica set where we lost a storage server? The new configuration can direct reads to the unhealthy replica set. In the background the client can then copy that data onto a healthy replica set. Then we can reconfigurate a second time to remap the range from the unhealthy replica set onto the replacement.

A corner case to consider is a client and a replica set being partitioned from the rest of the cluster. On the majority side of the partition a conconfiguration could be taking place to remove the replica set. The client can then get a false err_unwritten and look to fill it with a no-op. The paper asserts that this maintains “seralisation of reads”. This means that if you read a value you will continue to read a value. They are asserting that reading a no-op isn’t a read its more like a null  or  Nil  value meaning “undefined”. The idea being that if the partition is healed the application can be reset and will see the real value. Sounds a bit messy but if we can automate the reset then why not.

They given an alternative approach. We can have each storage unit renew a lease. The management of the leases can enforce that no two single storge units have an overlapping active range. The catch with leases is clock skew such that we have to wait a while before a partitioned nodes looses a lease. Still leases are a pretty standard technique. The logicial thing to do is run the renewal through the same blackbox Paxos algorithm to ensure that the leases are made strongly consistent across the cluster.