Join starts working N ticks away from the market (usually) and then watches for the opposite inside to become same side inside market; if there is then enough size on that market, it will join.
#include "spark/lib/config/config.h" #include "spark/lib/config/protos/system.pb.h" #include "spark/lib/domain_model/Security.h" #include "spark/lib/orderman/orders/server/IAlgoOrder.h" #include "spark/lib/orderman/orders/server/IAlgoServices.h" #include "spark/lib/plugin/SparkServices.h" #include "spark/lib/orderman/order_factory/IOrderFactory.h" #include "spark/lib/orderman/orderman_utils/Book.h" #include "spark/lib/orderman/orderman_utils/orderman_utils.h" #include "spark/lib/utils/PBUtils.h" using namespace bts::api::orderman; using namespace bts::orderman; namespace bts { namespace order_example { // Join starts working N ticks away from the market (usually) and then // watches for the opposite inside to become same side inside market; // if there is then enough size on that market, it will join. class DynamicJoin : public IAlgoOrder { public: DynamicJoin(); ~DynamicJoin(); OrderOpResult do_before_launch(IBeforeLaunchAlgoServices &) override; void do_launch(IAlgoServices &) override; void do_launch_attached(IAlgoServices &, OrderKey, ChildState) override; void do_update(api::orderman::Update const &) override; void on_book(SecurityKey, Book const &) override; void on_child_change(OrderKey, ChildState const &) override; private: enum class Status; struct PImpl; std::unique_ptr<PImpl> me; IAlgoServices &algo_services() const; void try_to_join(); }; // The three stages our algo goes through. We start by waiting to // observe the book for our security (WAITING_FOR_MD). Once we have a // book we use it to derive our target price and we continue to // monitor the book to see if we should join the market // (WAITING_TO_JOIN). Once the size and price constraints are met we // jump to the target price (JOINED). We remain in the JOINED state // until our order is filled or canceled. enum class DynamicJoin::Status { WAITING_FOR_MD, WAITING_TO_JOIN, JOINED }; // All state information needed for this algo struct DynamicJoin::PImpl { IAlgoServices *algo_services; Quantity min_quantity_to_join; Status status = Status::WAITING_FOR_MD; OrderKey child_key = null_order_key(); ChildTemplate *child_template = nullptr; Book book_at_inception; Book last_book; }; DynamicJoin::DynamicJoin() : me(new PImpl()) {} DynamicJoin::~DynamicJoin() {} IAlgoServices &DynamicJoin::algo_services() const { return *me->algo_services; } // This is called outside of the latency sensitive path // once. Expensive initialization should be done here, rather than // in do_launch() OrderOpResult DynamicJoin::do_before_launch(IBeforeLaunchAlgoServices &serv) { auto &def = serv.definition(); bts_assert(def.has_custom_def()); bts_assert(def.custom_def().has_dynamic()); // Validate our parameters, returning an error if we find a // problem. If an error is returned here the order will be // abandoned. auto ¶ms = def.custom_def().dynamic().order_params(); auto mqi = get_if<uint32_t>(params, "Min Quantity"); if (!mqi || mqi.value() == 0) { return OrderOpResult{ ORDER_ERROR_INVALID_ORDER_DEFINITON, "Min Quantity parameter must be defined and non zero"}; } me->min_quantity_to_join = Quantity(mqi.value()); if (!serv.is_attaching()) { // If we are not attaching, create a child template so we can // quickly launch our child when the time comes. // We will be sending a limit order with the same side, // security, trader, account, etc, as this algo order, so just // copy our definition to the child definition to start with. auto child_def = def; // Now add a limit order component to mark our child as a limit // order. Limit orders do not require any extra parameters // beyond those already inherited from this algo order // (i.e. beyond what we obtained through serv.definition()) so // we can just use a default constructed LimitOrderDefinition. *child_def.mutable_custom_def()->mutable_limit() = LimitOrderDefinition{}; // Now just build the child template and cache it for future use me->child_template = &(serv.make_child_template(child_def)); } else { // If we are attaching to an existing order then we do not need // to spawn a new child, so there is no need to setup a child // template } // All is good, return a default constructed OrderOpResult which // indicates success. return {}; } // Called in the latency sensitive path, once, when the algo is first // launched. void DynamicJoin::do_launch(IAlgoServices &services) { // Cache our algo services me->algo_services = &services; LOG(OrderMan, Diag, "Lauching DynamicJoin k: {} ", algo_services().order_key().value()); // Launch our child order me->child_key = services.launch_child(*me->child_template, algo_services().price(), algo_services().quantity()); // Subscribe to market data services.subscribe_book(SecurityKey(algo_services().security().id())); } // Called in the latency sensitive path, once, when the algo is first // launched. In this case the algo is provided with an existing order // it is now responsible for managing. void DynamicJoin::do_launch_attached(IAlgoServices &services, OrderKey attached_order, ChildState st) { // The order was launched and attached to an existing order, with // key 'attached_order' and current state 'st' // Cache our algo services me->algo_services = &services; LOG(OrderMan, Diag, "Lauching DynamicJoin k: {} Attached to existing order: {}", algo_services().order_key().value(), attached_order.value()); // Cache the key of the attached order we will be managing me->child_key = attached_order; // Subscribe to market data services.subscribe_book(SecurityKey(services.security().id())); // Simulate a child change so we can update our own state to // reflect that of the child order we attached to. From this point // forward there is no distinction between an algo instance that // spawned a new child order and one that attached to an existing // order. on_child_change(attached_order, st); } // Called when a human (or a supervising algo) updates a parameter for // this algo. The implementation validates the update and depending on // the validation outcome either applies the update or terminates the // algo. void DynamicJoin::do_update(api::orderman::Update const &update) { LOG(OrderMan, Diag, "DynamicJoin k: {} receiving update: {}", algo_services().order_key().value(), utils::pb::to_string(update)); // Validate update if (update.quantity() == 0) { // An invalid update has been detected so we terminate the // algo, signaling an error. Any child orders launched by // this algo will also be terminated. algo_services().terminate_error("Invalid update"); return; } // Updates have two parts, a standard part consisting of price and // size and present in all updates, and an optional part that is // specific to the algo in question. Below we check for the // existance of this optional part and validate if present. bool has_new_min_qty = false; if (update.has_custom()) { auto ¶ms = update.custom().dynamic().order_params(); auto mqi = params.find("Min Quantity"); if (mqi == params.end() || mqi->second.as_uint32() == 0) { algo_services().terminate_error( "Minimum quantity to join must be greater than 0"); return; } has_new_min_qty = true; me->min_quantity_to_join = Quantity(mqi->second.as_uint32()); } // Update is good, forward it to our child Update child_update; child_update.set_price(update.price()); child_update.set_quantity(update.quantity()); algo_services().update_child(me->child_key, child_update); // Try to join again if conditions changed if (me->status == Status::WAITING_TO_JOIN && has_new_min_qty) try_to_join(); } // Called when the book for the specified security has changed void DynamicJoin::on_book(SecurityKey, Book const &b) { // Cache the book, so we can reffer to it in the event of a // parameter change that might allow us to join me->last_book = b; LOG(OrderMan, Diag, "DynamicJoin order {} on book. BID: {}@{} ASK: {}@{}", algo_services().order_key().value(), b.side(api::SIDE_BUY)[0].size(), b.side(api::SIDE_BUY)[0].price(), b.side(api::SIDE_SELL)[0].size(), b.side(api::SIDE_SELL)[0].price()); switch (me->status) { case Status::WAITING_FOR_MD: { auto other_side = securities::other_side(algo_services().definition().side()); auto &other_side_top = me->last_book.side(other_side)[0]; if (other_side_top.size() > 0) { // First time we get the opposite side market, we will use // this price as the target price for the join me->status = Status::WAITING_TO_JOIN; me->book_at_inception = b; LOG(OrderMan, Diag, "JoinOrder order {} receiving first book: {} Target: {}", algo_services().order_key().value(), me->min_quantity_to_join.value(), other_side_top.price()); try_to_join(); } } break; case Status::JOINED: // Nothing to do at this point break; case Status::WAITING_TO_JOIN: // Check if we reached the target and join if so try_to_join(); break; } } // Given the current book, our target price and our min quantity // parameter, join if appropriate void DynamicJoin::try_to_join() { bts_assert(me->status == Status::WAITING_TO_JOIN); auto my_side = algo_services().definition().side(); auto other_side = securities::other_side(my_side); auto &my_side_psz = me->last_book.side(my_side)[0]; auto &opposite_at_inception_psz = me->book_at_inception.side(other_side)[0]; if (my_side_psz.size() && my_side_psz.size() >= me->min_quantity_to_join.value() && securities::is_equal_or_better( my_side, Price(my_side_psz.price()), Price(opposite_at_inception_psz.price()))) { // Time to update our child to join the market Update child_update; child_update.set_price(my_side_psz.price()); child_update.set_quantity(algo_services().quantity().value()); algo_services().update_child(me->child_key, child_update); me->status = Status::JOINED; LOG(OrderMan, Diag, "DynamicJoin order {} joined: {}@{}", algo_services().order_key().value(), algo_services().quantity().value(), my_side_psz.price()); } else { LOG(OrderMan, Diag, "JoinOrder order {} still waiting to join. Order: {}@{} Top of " "the book: {}@{} Target: {}@{}", algo_services().order_key().value(), algo_services().quantity().value(), algo_services().price().value(), my_side_psz.size(), my_side_psz.price(), me->min_quantity_to_join.value(), opposite_at_inception_psz.price()); } } // Our child reported a change: void DynamicJoin::on_child_change(OrderKey, ChildState const &st) { // Our fill quantity and price is always a reflection of our // child's state algo_services().update_fill_state(st.total_fills, st.last_price); switch (st.status) { case ChildStatus::WORKING: // Child order is still working, nothing to do return; case ChildStatus::COMPLETED: // Child order completed, we should also complete algo_services().complete(); return; case ChildStatus::CANCELED: // Child order unexpectedly canceled, signal an error and // terminate algo_services().terminate_error("Unexpected child cancel"); return; } } // This is called by the spark daemon at startup. In this call we // describe our algo's parameters and register a factory function to // create an instance of the algo. extern "C" void init_spark_plugin(bts::SparkServices &serv) { using namespace bts; DynamicOrderDescriptor join_desc; // Start by naming our order type join_desc.set_dynamic_order_type("dyn join"); // Now add a single unsigned int parameter, Min Quantity, that will // determine the minimum quantity that must exist in the book in // order for us to join the market. *join_desc.add_param_desc() = param_uint32("Min Quantity"); // Associate the descriptor with a builder function and register // that with the Spark server. auto result = serv.register_dynamic_algo_order( join_desc, []() { return std::make_unique<DynamicJoin>(); }); if (result != bts::AlgoRegistrationResult::SUCCESS) { // The join_desc has errors // Check the error log for more details } } } // namespace order_example } // namespace bts