Overview of the Corda State Machine
February 19, 2020
Corda flows are written as linear Kotlin or Java code, but need to be enriched with a lot of other activities when being executed, e.g.:
Checkpointing their current state for fault tolerance
- Executing subflows
- Sending and waiting for receipt of messages
- Asynchronous calls and waiting for results.
Corda achieves this by running them in a state machine based on Quasar fibers. This page tries to provide a simplified overview of how this hangs together. This should be a good starting point to have some operational understanding.
Caveats
There are two things to keep in mind when using this information:
- This is a simplified description of what is going on, concentrating on the conceptual steps, and mostly looking at the happy path, ignoring error handling. You probably need to dig a bit deeper than this if you actually need to make changes to the state machine/the flow framework/P2P messaging. There is a very detailed description of the technical aspects of this on the Corda docsite.
- This describes the implementation of a Corda node as it stands today – however, this is an internal implementation and might be changed in the future. If you write a CorDapp, you should design it based on the official documentation and API – the concepts described here are meant to help understand those, not work around them.
What is Quasar?
Quasar is a Java library that uses clever byte-code rewriting and exceptions to provide continuations. The main concept is a fiber
– a thread like construct that runs code, but can decide to suspend itself, at which point the state of it gets collected and stashed away, to be re-hydrated and continued later.
In order to achieve this, Quasar requires any method that can be suspended to be annotated with its @Suspendable
annotation. At start-up time/just before JIT, it rewrites the bytecode of all these methods. The main difference is that it adds throws SuspendExecution
to the signature and adds a catch/rethrow block for SuspendExecution
exceptions to all catch blocks so that this kind of exception cannot be caught in user code.
To suspend a fiber, the code calls a specific quasar function (e.g. parkAndSerialize
) which throws aSuspendExecution
exception. This exception passes through all user code and gets caught by Quasar, which can extract the call stack to squirrel it away in serialized form. The byte code instrumentation also captures any values passed to functions on the call stack, local objects and thread storage locals so this can be stored to recreate the fiber later. All this gets stored in memory, and the executor lets go of its current thread of execution so it can be reused to run something else.
When re-instating the fiber, the call-stack gets recreated on a new thread, all the values get passed into functions as recorded, the thread local storage gets restored, and execution carries on at the point where it got stopped.
For this to work, it is crucial that all functions in the stack to be suspended are annotated as @Suspendable
so that quasar can instrument them – otherwise the exception might get caught, or parameters are not captured correctly, and the fiber does not get suspended correctly. This is a common cause of errors in CorDapps (e.g. the transaction context missing
error), and it can be quite tricky to figure out which function on the stack is missing the annotation.
The State Machine
The state machine is the core of what is called the Flow Framework in our user documentation. It wraps quasar and uses it to turn the linear Kotlin or Java code inside the flow logic implementation in a CorDapp into code that can be suspended, the current state of which can be checkpointed, and that can be rerun from any checkpoint.
The state machine has an event feed, and one or more execution threads. Events trigger transitions between the possible states.
Every flow starts in the Pending
state as a start flow message, and then transitions to running. In the easiest case, it finally reaches the end of its run function and transitions to success, the result gets returned and any checkpoint data cleaned up.
It can at any point be suspended (more on that below), and then revived later to carry on running.
If it errors, it moves to the FlowHospital
, where three things can happen:
-
- If the error is a transient error, it will go back to pending to be retried from the last checkpoint
- If the error is fatal, the flow terminates as failed, an error is returned to the caller and all checkpoints are deleted
- If the error can’t be simply retried, but failing is not an option (e.g. because that would threaten the integrity of the ledger), the flow moves into a
hold
state. This requires human intervention to- fix the condition that led to the error
- restart the flow from the last checkpoint manually.
Lifecycle of a flow
When thinking about the lifecycle of a flow, it is useful to think about user and kernel space inside a Corda node, where user space is the code run in the flow logic that users can provide, and kernel space is the state machine/flow framework logic around that that facilitates the execution of the run.
1 Start a flow
When a flow gets started via RPC,StartFlow
gets called on the state machine. This schedules an event to start the flow (note that RPC handling and message deserialisation happen on a different thread pool). As soon as an execution thread becomes available, the state machine will process the message and transition from Pending
to Running
. This includes:
- Acquire the thread
- Acquire a database connection and open a transaction
- Instantiate the requested FlowLogic instance from the user CorDapp, including passing in any parameters
- Call the run function on the FlowLogic.
2 Run and suspend
Then the user code runs in the flow until it hits a point where a suspension is required. The most common reasons for suspension are:
- Sending and then waiting for a message
- Calling
FlowLogic.sleep()
– i.e. suspending the flow for a given amount of time - Using the flow async API
For this example, let’s assume the flow needs to communicate with a counterparty and calls sendAndReceive()
. This function calls out to kernel space and does the following things:
- Serialising the payload passed as a message to this function
- Putting this message onto the send queue where it will get picked up by the P2P messaging system. That system requires its own discussion page, so we’ll only mention here that we require this to be durable, deliver exactly once: Once we posted the message, we assume it will be delivered eventually, and we will in due course get a reply, which will not be duplicated.
- Then it calls Quasar’s
parkAndSerialize
function, triggering the behaviour described above, recording the stack, values, objects and thread local storage. - On top of that, the node also needs to deal with allocated resources that Quasar ignores. Most prominently, it needs to commit the open database transaction and release the database connection. Any other resources (e.g. locks) need to be dealt with as well. This is done in Corda code.
- Corda also stores a copy of the fiber checkpoint in the database so that a suspended flow can survive a restart of the node.
- This uses a fully reflective Kryo serialiser so anything found on the stack can be serialised. This is a different serializer from the ones used in RPC and P2P communication, or for persisting states to the database – those need to limit what they will accept for serialisation and deserialisation, whereas the checkpointing serialiser needs to deal with everything we throw at it and does not need to deal with the long term evolution of data formats and structures.
- There is a list of things (mostly node/state machine infrastructure) that needs to be ignored for serialisation and will just be there at deserialisation time.
- Certain classes (e.g. Corda Services) implement the
SerializeAsToken
interface which means a token is recorded instead of the actual class instance. These classes need to provide a mechanism to get hold of an instance at deserialisation time. This is particularly useful for singletons of which we cannot have multiple copies.
- Then the fiber transitions from running to suspended, and the execution thread becomes available to run other tasks.
3 Receive and resume
Once the response message arrives, the P2P messaging system will put a message in the event queue such that the fiber can be woken up. When an execution thread becomes available, the state transition for the suspended fiber begins.
- The call stack and variables are recreated
- Resources are reacquired, e.g. DB connection and a transaction is opened
- The message is deserialized
- The message will only be acknowledged in Artemis when the flow hits the next suspension point or finishes. As acknowledging will remove it from the durable P2P messaging, this means that in case of failover, the flow will rerun from the checkpoint from after sending the message and re-receive the message from the durable queue.
- Then the flow logic in user space gets invoked at the point where it called
sendAndReceive
and get the content of the message as return value of the function call.
4 Run to completion
When the flow logic successfully reaches the end of its run function, control passes back to the kernel space.
- The database transaction is committed
- All checkpoints for this flow are removed from the database (as it succeeded, a partial rerun will not be required).
- Any outstanding messages need to acknowledged in Artemis so they’re cleared from the queue.
- The return value is handed over to the RPC pool so it can be serialized and sent back to the calling client.
- The database connection is released, the fiber is terminated and the thread is freed up to run other tasks.
Implications
This above implementation has some implications for developers of CorDapps
- Normal ACID database logic cannot be simply used in CorDapps, even though we expose a raw JDBC connection.
- You cannot simply commit a transaction in a flow logic run method (calling
FlowLogic.sleep(1.millis)
is a possible work-around) - A suspension can cause a database commit at a surprising time.
- If the flow fails, the transaction will be rolled back – if you did something with the JDBC connection, that will be lost. On retry, the flow will restart from the last message, so the database transaction will re-occur.
- You cannot roll back the transaction – inside the flow logic you are inside the flow’s transaction, so if you write SQL code and something goes wrong and you roll back, you could threaten the integrity of the ledger
- You cannot simply commit a transaction in a flow logic run method (calling
- Every part of the flow gets run at least once – however, if something goes wrong, the part between the last checkpoint and the point where the error happened will be rerun. The flow clears up any database transaction happening via a rollback so it can be re-run cleanly, but any other interaction with third-party components has to be idempotent or able to handle being called more than once.
- Annotation with
@Suspendable
has to be done very diligently. Due to the logic Quasar uses, it is not possible to call a suspendable function from a non-suspendable function, as this might lead to a non-supendable function on the callstack during a suspend operation. (The other way round is fine – a suspendable function can call un-interuptible things like e.g. database operations). - The byte code scanning/rewriting takes quite a bit of time at start-up, the suspend operation is not exactly cheap and so on.
- It is possible to write code where the instrumentation goes wrong (e.g. using suspendable methods inside a sufficiently complex Kotlin
.map{}
operation). This can result in invalid byte code and the JVM just giving up at runtime.
This is a known issue with Kotlin (https://youtrack.jetbrains.com/issue/KT-19251) and there is a now a compiler flag you can set to make it work more smoothly once you are aware of the issue:
compileKotlin {
kotlinOptions {
freeCompilerArgs = ["-Xnormalize-constructor-calls=enable"]
}
}