TRex: A Paxos Replication Engine (Part 2)

by simbo1905

The last post gave a high level overview of TRex an embeddable Paxos engine for the JVM. In this second post we will take a high level walk through a Java demo that wraps a trivial stack to deploy it replicated across a cluster of nodes. Fork me on GitHub

Warning this article covers the 0.1 codebase which is tagged here. The latest code on the master branch may have changed significantly.

An stack is a rather unrealistic service. Yet as a toy application it has the virtue of being simple enough to fade into the background. This allows us to focus on how the simple service is being replicated across a cluster of nodes using TRex. The techniques shown in the demo can be used to replicate a more realistic service; say one that uses a local database to make state durable. There is a Scala demo application that does exactly.

The Java stack demo application has four files:

  1. StringStack.java Is the interface that defines the service we are going to replicate across a Paxos cluster.
  2. StringStackImpl.java Is the implimentation of the service interface. It is just a shim over a java.util.Stack and a java.io.RandomAccessFile where the state of the stack is made durable between restarts.
  3. StackClient.java Is a client commandline application. It reads instructions to “push” and “pop” strings from standard input. Depending upon the command line parameters it uses either a local StringStack or a remote proxy to a Paxos cluster of nodes running replicated copies of a StringStack.
  4. StackClusterNode.java Is a launcher for the Paxos cluster nodes.

We will skip over the first two files as they are trivial. Their purpose is to represent an arbitrary application service that we can either run locally to unit test else deploy remotely as a network addressable service. We will start with the StackClient program. Running it with the command line  parameter “local” instantiates it with an in-memory StringStack running a REPL where we can push strings onto the stack and then pop them off. A transcript of an interaction looks like:

java -cp [...] com.github.trex_paxos.javademo.StackClient local
using local stack
>push hello
>push world
>pop
world
>pop
hello

The code to achieve that is pure boilerplate. Where things get interesting is if we pass in arguments to have it connect to a remote StringStack running on a Paxos cluster:

java -cp [...] com.github.trex_paxos.javademo.StringStack clustered client3.conf 127.0.0.1

Launched in this manner the client program instantiates a Java dynamic proxy as the StringStack and forwards method invocations over the network to the TRex distinguished leader. A transcript of interacting with the program running in this mode looks identical to running the program in local mode.

The code which sets that up a remote proxy to a clustered version of the stack is as follows:

static StringStack clusteredStack(final String configName, final String hostname) {
    Config systemConfig = ConfigFactory.load(configName).withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostname));
    ActorSystem system = ActorSystem.create("trex-java-demo&aquot;, systemConfig);
    Config config = ConfigFactory.load(configName);
    Cluster cluster = Cluster.parseConfig(config);
    ActorRef driver = system.actorOf(Props.create(StaticClusterDriver.class, akka.util.Timeout.apply(100), cluster, 20));
    StringStack stack = TypedActor.get(system).typedActorOf(new TypedProps<StringStackImpl>(StringStack.class, StringStackImpl.class), driver);
    return stack;
}

This method takes a config file name such as “config3.conf” which defines the details of a three node static Paxos cluster. At line 5 it parses the details of the cluster from the config file. At line 6 it creates an instance of StaticClusterDriver Actor passing it the details of the cluster. Line 7 it creates a dynamic proxy which implements the StringStack interface that actually converts method calls into synchronous messages which are forwarded to our StaticClusterDriver actor. That line uses the standard Akka “Typed Actor” feature that lets you adapt a regular application interface into the Akka system. This gives us an out-of-the-box adaptor to convert between regular Java method calls and messages that are sent to an actor. In this case we send to a local actor which interacts with the Paxos cluster nodes over the network.

The salient point is that a method invocation on the interface blocks on sending a message encapsulating the method call to an actor; the actor forwards this to a Paxos cluster. With respect to the algorithm the method call messages are client values. The cluster runs the consensus algorithm over the values and applies then in strict order to a local StringStack on each cluster node.

The StaticClusterDriver is provided with the configuration which will be used to run the cluster which looks like the following:

# 3 node localhost cluster configuration
trex {
# folder to use to persist data at each node
data-folder="/tmp"
# number of slots entries to retain in the log to support retransmission
data-retained=1048576
# static cluster definintion
cluster {
name = "PaxosCluster"
nodes = "2552,2562,2572"
node-2552 {
host = "127.0.0.1"
client-port = 2552
node-port = 2553
}
node-2562 {
host = "127.0.0.1"
client-port = 2562
node-port = 2563
}
node-2572 {
host = "127.0.0.1"
client-port = 2572
node-port = 2573
}
}
# timeouts
leader-timeout-max=4000
leader-timeout-min=2000
}

That config defines three nodes each running on the loopback interface. They each expose a TCP port to clients and a UDP port over which the Paxos Parliament algorithm is run. The StaticClusterDriver takes note of the TCP details of each node in the cluster and will scan through nodes until it finds the stable leader to run commands against. If that leader goes down it will scan again until it finds the replacement leader. The config is just enough for local integration testing on a single machine; a more realistic cluster would run nodes on different hosts bound to the same port numbers.

The final file in the demo StackClusterNode.java starts a cluster node taking the same configuration. We launch that three times giving it a different node number to bind to different ports using a command-line like:

java -cp [...] com.github.trex_paxos.javademo.StackClusterNode server3.conf 2562

The code is as follows:

public static void main(String[] args) throws IOException {
if( args.length != 2) {
usage(1);
}

final String configName = args[0];
final Integer nodeId = Integer.valueOf(args[1]);

final StringStack stack = new StringStackImpl(new File(System.getProperty("java.io.tmpdir")+"/stack"+nodeId.toString()));

Config config = ConfigFactory.load(configName);
Cluster cluster = Cluster.parseConfig(config);

Node node = cluster.nodeMap().get(nodeId).get();
File folder = new File(cluster.folder() + "/" + nodeId);
if (!folder.exists() || !folder.canRead() || !folder.canWrite() ) {
System.err.println(folder.getCanonicalPath() + " does not exist or do not have permission to read and write. Exiting.");
System.exit(-1);
}
Journal journal = new FileJournal(new File(folder, "journal"), cluster.retained());
Config systemConfig = ConfigFactory.load(configName)
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(node.clientPort()))
.withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(node.host()));

final ActorSystem system = ActorSystem.create(cluster.name(), systemConfig);

PaxosActor.Configuration conf = new PaxosActor.Configuration(config, cluster.nodes().size());

system.actorOf(Props.create(TrexServer.class, cluster, conf, nodeId, journal, stack));
}

Line 9 sets up a local instance of the StringStack using a unique file to make the stack durable. TRex will keep each stack at each node in sync with all the others by only applying method invocation message that are chosen by the consensus algorithm.

Most of the remainder of the method is boiler plate. The interesting line is 29. This creates a TrexServer actor which takes the cluster details, the node unique number, the journal to persists messages, and the local StringStack object. The TrexServer actor has no compile time dependency on the StringStack object. It takes the method invocation messages chosen by the consensus algorithm and reflectively invokes them on the local StringStack object. The use of reflection is only for the convenience of the demo and is entirely optional. The consensus logic is within a superclass that you can extend with code compiled against your own message format and application logic.

Line 20 shows an implementation choice in the selection of a FileJournal as the Paxos store. This provides at-least-once semantics during crash recovery. If a node crashes between applying a chosen message to the stack then updating the message journal to mark the message as completed then the message will be delivered a second time when the process is restarted. In the case of our stack this could lead to a double push or double pop at the crashed node. To handle this scenario the deliver method which calls up to application code with the chosen method includes a message ID. This can be used to deduplicate repeated message delivery during crash recovery. For brevity the demo does not include such “crash replay safety” logic. A more realistic application service that uses a database can achieve at-most-once semantics during crash recovery by providing a custom Journal object which participates in application transactions.

Now we have covered the demo application it is worth describing the full data flow. The client code invokes the StringStack interface methods on the TypedActor dynamic proxy. The TypedActor proxy forwards a MethodCall message for each method invocation and blocks on a response. The MethodCall message is sent to the TRex driver. The driver has Akka serialise the message and forwards it to the distinguished leader over TCP. At the other end the distinguished leader runs the Paxos algorithm over UDP. The chosen messages are invoked on all the replicated objects in consensus order. The method return value at the distinguished leader is sent back to the client TRex driver. It responds to the TypedActor which competes the method call returning any result to the client code.

 

 

Advertisements