Introduction
HRTers process real-time price and volume data for over a million symbols across global markets every day. If we want humans to have a chance of keeping up, we need that data efficiently represented in a tabular format! SmallGrid is a platform that allows running processes to publish real-time live tabular data. A SmallGrid data update may change a row in a table, or just a single cell. SmallGrid’s system design allows automatic deduction of the affected downstream aggregations that depend on the updated data, and is the backbone for serving a dynamic view of market data, and various system metrics. During his time at HRT, Summer Core Intern Will Gulian worked on implementing the initial SmallGrid backend that listens for and distributes data updates. In the future, we plan to implement optimized aggregation updates to unlock SmallGrid’s scaling potential.
Hengchu Zhang, Algo Engineer
SmallGrid
For anyone exploring data, the interactivity provided by a Jupyter notebook and pandas is incredible. You can see any data you want as it works its way through the notebook. Unfortunately, we lose the ability to introspect all the data when productionizing a notebook. There’s no interactivity so the best you can do is log some of the current state and check the logs later, but markets are dynamic, they change constantly. A strategy that’s worked well in the past may stop working unexpectedly. There could be a bug, maybe someone else discovered the same strategy, or maybe the ever-turbulent markets just behave differently now. In any case, we would really like to see what a strategy is “thinking” live and in a clean way, with minimal overhead.
To provide better visibility into live data, my project for the 2nd half of the internship at HRT was to work on a data publishing API called SmallGrid. SmallGrid has several moving pieces because the data needs to have a simple publishing API, but the API could be called from reasonably hot code so it needs to be efficient if necessary as well. To minimize the work code using SmallGrid needs to perform, there is also a SmallGrid daemon that handles actually publishing the table data. We use Redis extensively and are also looking into Apache Kafka for some use cases so SmallGrid supports publishing to both Redis Streams and Kafka topics.
Writing Data
Algo developers at HRT use Python and C++ so the SmallGrid API needs to be convenient to use in either language. The API I settled on has global tables accessible by name and cells can be written to by row and column:
auto t = SmallGrid::table("my table name");
t.publish("SPY", "MidPx", 440.70);
t.publish("AAPL", "MidPx", 147.06);
Doubles are the most commonly used type but other types, including strings and booleans, are also available. Internally, the different types are stored using std::variant
which uses the same memory to store different types, like a union, but provides a safe wrapper by including a tag to determine what type is currently stored.
t.publish("SPY", "Exchange", "Arca");
t.publish("AAPL", "Fruit", true);
A surprising catch with using variants is that std::visit()
is slower than you’d expect. It’s not that slow and I still used it but in some microbenchmarks, std::visit
isn’t magically faster than dynamic dispatch and can actually be slower than using a virtual function.
While std::visit
wasn’t too concerning, using std::unordered_map
could be. Looking up rows and columns by name is very common and using unordered_map
, a string
would need to be constructed each time we want to resolve the name. Most strings should be small enough to benefit from the Small String Optimization, and thus not require a heap allocation, but it would be nice to avoid constructing a string in the first place. Fortunately abseil comes to the rescue with a faster map implementation absl::flat_hash_map
that also supports heterogenous lookup so finding a const char *
or std::string_view
does not require constructing a temporary std::string
.
Even with abseil, we still need to hash the strings and it would be nice to avoid that if possible. For this case, row and column names can be converted to a handle to avoid repeatedly rehashing the names using row/col()
for single names and rows/cols()
for many names. We can also retrieve a cell handle with cell()
that only requires indexing into a vector to update a cell value.
auto goog = t.row("GOOGL");
// or many rows...
auto [aapl, msft] = t.rows("AAPL", "MSFT");
auto midPx = t.col("MidPx");
t.publish(goog, midPx, 2711.91);
auto msftPx = t.cell(msft, midPx);
// alternatively: msftPx.publish(288.33);
msftPx = 288.33;
C++17’s structured binding makes creating many handles very convenient auto [aapl, msft] = t.rows("AAPL", "MSFT");
, and it’s surprisingly simple to implement with the right parameter pack incantations.
template <typename... Ts> array<AxisHandle<AxixRow>, sizeof...(Ts)> rows(Ts... ts) {
return {row(ts)...};
}
You might notice that row()
and therefore rows()
returns AxisHandle<AxisRow>
. Row handles and column handles are different types AxisHandle
/ AxisHandle
which makes accidentally using a row handle as a column or vice versa fail to compile with a nice error message (for C++ standards). The incorrect usage is highlighted, with SmallGrid::AxisRow
and SmallGrid::AxisCol
in the last message in bright ANSI green thanks to changes in GCC 8.
src/data/test/SmallGrid_test.cc:229:29: required from here
src/data/SmallGrid.h:915:51: error: no matching function for call to ‘resolveIndex<SmallGrid::AxisCol>(SmallGrid::StringIndex&, SmallGrid::AxisHandle<SmallGrid::AxisRow>&)’
915 | auto colIdx = resolveIndex<SmallGrid::AxisCol>(colIndex, col);
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~
...
src/data/SmallGrid.h:897:15: note: candidate: ‘size_t SmallGrid::resolveIndex(SmallGrid::StringIndex&, SmallGrid::AxisHandle<Type>) [with Type = SmallGrid::AxisCol; size_t = long unsigned int]’
897 | inline size_t resolveIndex(StringIndex& ind, AxisHandle<Type> handle) {
| ^~~~~~~~~~~~
src/data/SmallGrid.h:897:63: note: no known conversion for argument 2 from ‘AxisHandle<SmallGrid::AxisRow>’ to ‘AxisHandle<SmallGrid::AxisCol>’
897 | inline size_t resolveIndex(StringIndex& ind, AxisHandle<Type> handle) {
| ~~~~~~~~~~~~~~~~~^~~~~~
The API in Python looks very similar, with some added features to make using SmallGrid in Python more convenient.
# pyatl = ‘Python Automated Trading Library’
# It contains Python bindings exported from our C++ trading library
from pyatl import SmallGrid
t = SmallGrid.table("my table name")
t.publish("SPY", "MidPx", 440.70)
t.publish("AAPL", "MidPx", 147.06)
# handles
goog = t.row("GOOGL")
aapl, msft = t.rows("AAPL", "MSFT")
midPx = t.col("MidPx")
t.publish(goog, midPx, 2711.91)
msftPx = t.cell(msft, midPx)
msftPx.publish(288.33)
The Python tables have a function toDataFrame()
which creates and returns a Pandas data frame of the table data. While probably not useful in a production environment, this makes writing tests much more convenient.
print(t.toDataFrame())
# => MidPx
# => SPY 440.70
# => AAPL 147.06
# => GOOGL 2711.91
# => MSFT 288.33
Interacting with Pandas is surprisingly easy with pybind11
, HRT’s python <-> C++ binding library of choice. It has special support for numpy arrays if I were worried about performance, but I just want to call pd.read_json(string, orient='split')
and doing so is easy with pybind.
pyTable.def(
"toDataFrame",
[](const SmallGrid::Table& table) {
auto tableJson = table.dataToJson();
auto pd = py::module_::import("pandas");
auto readJson = pd.attr("read_json");
return readJson(tableJson, "orient"_a = "split");
}, "docstring...");
There are also bulk publishing functions publishRow()
and publishCol()
with keyword arguments to update several stats about a particular symbol at once. Suppose we want to update the price and volume for Microsoft, we can do that in one line.
t.publishRow("MSFT", MidPx=288.33, Volume=1.3e7)
Using a variant here makes the autogenerated documentation from pybind much clearer, pybind will annotate the argument with the type Union[str, RowHandle]
. Handling function kwargs is made easy by the fact that we just need to iterate over each kwarg entry although comically structured bindings and lambda captures don’t mix (example and discussion) so the entry
pair cannot be bound here.
using RowVar = variant<string_view, SmallGrid::AxisHandle<AxixRow>>;
pyTable.def(
"publishRow",
[](SG::Table& table, RowVar rowVar, py::kwargs kwargs) {
for (auto& entry : kwargs) {
visit([&](auto& row) {
table.publish(row, py::cast<string_view>(entry.first),
py::cast<SmallGrid::ValueType>(entry.second));
}, rowVar);
}
},
"docstring...");
not-Writing Data
All the examples so far are about SmallGrid’s publishing API, however the best monitoring solution is one you don’t have to think about. That’s why, along with the push-based writing to cells, SmallGrid can also watch variables automatically for code that uses our event loop system. It might seem weird to write code in an event loop style, but it makes all kinds of workloads faster. For IO-bound work, the event loop can use epoll() to sleep just until any IO operation has completed at which point new work can be performed.
Since a lot of code at HRT operated on an epoll event loop, it makes adopting SmallGrid monitors easy with existing code because it doesn’t require any changes to get working other than setting up the monitor itself and stashing the handle. The handle is what keeps a particular watch active. It could be confusing if someone forgets to store the handle but by using C++17’s [[nodiscard]]
attribute combined with -Werror
, forgetting the handle triggers a compile error.
// subscribe on eventLoop, every 50 ms and write the collected variables to "myTable".
auto m = SmallGrid::Monitor::get(eventLoop, "myTable", 50ms);
int64_t counter = 0;
// a handle for this monitored variable. the reference must not be invalidated while the handle is alive.
auto _h = m.monitor("row1", "col1", counter /* reference */);
// increment the counter 10x per second... the table cell will be updated automatically.
eventLoop.intervalCB(100ms, [&counter]() {
counter++;
});
// Run the event loop so the callbacks are triggered.
eventLoop.run();
Callbacks are also supported, in fact, watching a reference is implicitly converted into a callback internally. We have our own callback type (similar to proposed std::function_ref
types) that is much lower overhead than std::function
but this is transparent to the user of SmallGrid who can provide lambdas or callbacks.
vector<bool> myVector;
// bind to a lambda
m.monitor("row1", "col1", [&]{ return myVector.size(); });
// bind to a member function of an object.
m.monitor("row1", "col2", member_cb(&myVector, &vector<bool>::size));
SmallGrid daemon
Writing data to some tables in memory is nice, but then we need to get the table data out of the trading process. For most use cases, writing to a file or a socket would be the obvious choice but, SmallGrid could be used in hot path code, so we want to avoid the overhead of any syscalls by moving the communication with our database, Kafka or Redis, into a daemon process.
But first, the messages need to get to the daemon process. We could communicate with the daemon over a unix socket. Instead, we do something even cooler: we create a shared memory ring buffer (which I will call a shmqueue). This is what the kernel would do behind the scenes anyway when you open a unix socket between two processes but by creating the shmqueue ourselves, we can read and write to the shmqueue with just the overhead of writing to memory.
There are several ways to create a memory region shared between two processes. Ironically, to set up the shmqueue we do use a unix socket, but the socket is not used once the shmqueue is setup (the socket closing is useful to indicate the other side has died). You could use the socket to communicate a shm file path, but a fancier solution involves using sendmsg()
and recvmsg()
and ancillary data to send a file descriptor to the other process. By sending the file descriptor, the processes don’t need to worry about file permissioning since the daemon process doesn’t need to open()
the shm file itself.
Once the shmqueue is configured, a SmallGrid client process just needs to write a cell update message (a simple struct) and can then get back to more important things. On the other side, the daemon process picks up any cell updates from each of the shmqueues and writes the updates to the Kafka topic or Redis stream.
There’s often many different processes on one machine, from trading processes to risk monitoring tools that may want to write to SmallGrid so centralizing this into a daemon cuts down on the required number of connections to Kafka or Redis (both are deployed in clusters) from an O( * )
connection count to just O( + )
connections handled by the daemon.
Publishing Updates
Once the cell updates get to the daemon, they need to get to the database. We have a nice Redis client, but HRT does not use Apache Kafka internally very much. I ended up using modern-cpp-kafka (which is a wrapper around the standard librdkafka) and this mostly worked, but there were some surprises along the way. modern-cpp-kafka
has a KafkaSyncProducer
which will synchronously write messages. This worked out of the box but did not have anywhere near the throughput I was looking for. modern-cpp-kafka
also has a KafkaAsyncProducer
and the async producer has a very neat-sounding parameter EventsPollingOption
. If EventsPollingOption::Manual
is provided, a function .pollEvents()
needs to be called periodically to process callbacks for any completed (or failed) message send()
calls. Unfortunately, pollEvents()
does not behave nicely with an event loop style because it does not have an execution timeout (there is a timeout parameter but the timeout determines how long to wait for new messages). I was working with very large message batch sizes, and when 2 million messages all got committed at once, .pollEvents()
would try to execute then destruct 2 million std::function
s and this could easily take over a second. Since other work on the event loop needs to get execution time too, this was unacceptable.
To solve this, I did not use EventsPollingOption::Manual
which means my callbacks would be called automatically (from a different thread) at which point the results would be written to my own queue, and the event loop can process the completed messages in chunks while letting any other tasks also get execution time.
Conclusion & Future Work
Will Gulian’s implementation provides a solid foundation for distributing tabular data updates. We are in the process of an initial rollout of tabular dashboards powered by SmallGrid – it’s very exciting! As a next step, we plan to implement incremental aggregation. Incremental aggregation batches tabular data updates, and only applies the updates to downstream aggregated values that depend on updated cells. This feature would increase tabular update throughout, and would avoid re-computing aggregated values unaffected by updates.
Hengchu Zhang, Algo Engineer