Join

Join starts working N ticks away from the market (usually) and then watches for the opposite inside to become same side inside market; if there is then enough size on that market, it will join.

This sample is part of CQG Custom Algo SDK examples included in the algo_centos8_sdk container.
#include "spark/lib/config/config.h"
#include "spark/lib/config/protos/system.pb.h"
#include "spark/lib/domain_model/Security.h"
#include "spark/lib/orderman/orders/server/IAlgoOrder.h"
#include "spark/lib/orderman/orders/server/IAlgoServices.h"
#include "spark/lib/plugin/SparkServices.h"

#include "spark/lib/orderman/order_factory/IOrderFactory.h"
#include "spark/lib/orderman/orderman_utils/Book.h"
#include "spark/lib/orderman/orderman_utils/orderman_utils.h"
#include "spark/lib/utils/PBUtils.h"

using namespace bts::api::orderman;
using namespace bts::orderman;

namespace bts {
namespace order_example {

// Join starts working N ticks away from the market (usually) and then
// watches for the opposite inside to become same side inside market;
// if there is then enough size on that market, it will join.
class DynamicJoin : public IAlgoOrder {
 public:
   DynamicJoin();
   ~DynamicJoin();

   OrderOpResult do_before_launch(IBeforeLaunchAlgoServices &) override;
   void do_launch(IAlgoServices &) override;
   void do_launch_attached(IAlgoServices &, OrderKey, ChildState) override;
   void do_update(api::orderman::Update const &) override;
   void on_book(SecurityKey, Book const &) override;
   void on_child_change(OrderKey, ChildState const &) override;

 private:
   enum class Status;
   struct PImpl;
   std::unique_ptr<PImpl> me;

   IAlgoServices &algo_services() const;
   void try_to_join();
};

// The three stages our algo goes through. We start by waiting to
// observe the book for our security (WAITING_FOR_MD). Once we have a
// book we use it to derive our target price and we continue to
// monitor the book to see if we should join the market
// (WAITING_TO_JOIN). Once the size and price constraints are met we
// jump to the target price (JOINED). We remain in the JOINED state
// until our order is filled or canceled.
enum class DynamicJoin::Status { WAITING_FOR_MD, WAITING_TO_JOIN, JOINED };

// All state information needed for this algo
struct DynamicJoin::PImpl {
   IAlgoServices *algo_services;
   Quantity min_quantity_to_join;
   Status status = Status::WAITING_FOR_MD;
   OrderKey child_key = null_order_key();
   ChildTemplate *child_template = nullptr;
   Book book_at_inception;
   Book last_book;
};

DynamicJoin::DynamicJoin() : me(new PImpl()) {}
DynamicJoin::~DynamicJoin() {}
IAlgoServices &DynamicJoin::algo_services() const { return *me->algo_services; }

// This is called outside of the latency sensitive path
// once. Expensive initialization should be done here, rather than 
// in do_launch()
OrderOpResult DynamicJoin::do_before_launch(IBeforeLaunchAlgoServices &serv) {
   auto &def = serv.definition();
   bts_assert(def.has_custom_def());
   bts_assert(def.custom_def().has_dynamic());

   // Validate our parameters, returning an error if we find a
   // problem.  If an error is returned here the order will be
   // abandoned.
   auto &params = def.custom_def().dynamic().order_params();
   auto mqi = get_if<uint32_t>(params, "Min Quantity");
   if (!mqi || mqi.value() == 0) {
      return OrderOpResult{
          ORDER_ERROR_INVALID_ORDER_DEFINITON,
          "Min Quantity parameter must be defined and non zero"};
   }

   me->min_quantity_to_join = Quantity(mqi.value());

   if (!serv.is_attaching()) {
      // If we are not attaching, create a child template so we can
      // quickly launch our child when the time comes.

      // We will be sending a limit order with the same side,
      // security, trader, account, etc, as this algo order, so just
      // copy our definition to the child definition to start with.
      auto child_def = def;
      // Now add a limit order component to mark our child as a limit
      // order. Limit orders do not require any extra parameters
      // beyond those already inherited from this algo order
      // (i.e. beyond what we obtained through serv.definition()) so
      // we can just use a default constructed LimitOrderDefinition.
      *child_def.mutable_custom_def()->mutable_limit() = LimitOrderDefinition{};
      // Now just build the child template and cache it for future use
      me->child_template = &(serv.make_child_template(child_def));
   } else {
      // If we are attaching to an existing order then we do not need
      // to spawn a new child, so there is no need to setup a child
      // template
   }

   // All is good, return a default constructed OrderOpResult which
   // indicates success.
   return {};
}

// Called in the latency sensitive path, once, when the algo is first
// launched.
void DynamicJoin::do_launch(IAlgoServices &services) {
   // Cache our algo services
   me->algo_services = &services;

   LOG(OrderMan, Diag, "Lauching DynamicJoin k: {} ",
       algo_services().order_key().value());

   // Launch our child order
   me->child_key =
       services.launch_child(*me->child_template, algo_services().price(),
                             algo_services().quantity());

   // Subscribe to market data
   services.subscribe_book(SecurityKey(algo_services().security().id()));
}

// Called in the latency sensitive path, once, when the algo is first
// launched. In this case the algo is provided with an existing order
// it is now responsible for managing.
void DynamicJoin::do_launch_attached(IAlgoServices &services,
                                     OrderKey attached_order, ChildState st) {
   // The order was launched and attached to an existing order, with
   // key 'attached_order' and current state 'st'

   // Cache our algo services
   me->algo_services = &services;

   LOG(OrderMan, Diag,
       "Lauching DynamicJoin k: {} Attached to existing order: {}",
       algo_services().order_key().value(), attached_order.value());

   // Cache the key of the attached order we will be managing
   me->child_key = attached_order;

   // Subscribe to market data
   services.subscribe_book(SecurityKey(services.security().id()));

   // Simulate a child change so we can update our own state to
   // reflect that of the child order we attached to. From this point
   // forward there is no distinction between an algo instance that
   // spawned a new child order and one that attached to an existing
   // order.
   on_child_change(attached_order, st);
}

// Called when a human (or a supervising algo) updates a parameter for
// this algo. The implementation validates the update and depending on
// the validation outcome either applies the update or terminates the
// algo.
void DynamicJoin::do_update(api::orderman::Update const &update) {
   LOG(OrderMan, Diag, "DynamicJoin k: {} receiving update: {}",
       algo_services().order_key().value(), utils::pb::to_string(update));

   // Validate update
   if (update.quantity() == 0) {
      // An invalid update has been detected so we terminate the
      // algo, signaling an error. Any child orders launched by
      // this algo will also be terminated.
      algo_services().terminate_error("Invalid update");
      return;
   }

   // Updates have two parts, a standard part consisting of price and
   // size and present in all updates, and an optional part that is
   // specific to the algo in question. Below we check for the
   // existance of this optional part and validate if present.
   bool has_new_min_qty = false;
   if (update.has_custom()) {
      auto &params = update.custom().dynamic().order_params();
      auto mqi = params.find("Min Quantity");

      if (mqi == params.end() || mqi->second.as_uint32() == 0) {
         algo_services().terminate_error(
             "Minimum quantity to join must be greater than 0");
         return;
      }
      has_new_min_qty = true;
      me->min_quantity_to_join = Quantity(mqi->second.as_uint32());
   }

   // Update is good, forward it to our child
   Update child_update;
   child_update.set_price(update.price());
   child_update.set_quantity(update.quantity());
   algo_services().update_child(me->child_key, child_update);

   // Try to join again if conditions changed
   if (me->status == Status::WAITING_TO_JOIN && has_new_min_qty)
      try_to_join();
}

// Called when the book for the specified security has changed
void DynamicJoin::on_book(SecurityKey, Book const &b) {

   // Cache the book, so we can reffer to it in the event of a
   // parameter change that might allow us to join
   me->last_book = b;

   LOG(OrderMan, Diag, "DynamicJoin order {} on book. BID: {}@{} ASK: {}@{}",
       algo_services().order_key().value(), b.side(api::SIDE_BUY)[0].size(),
       b.side(api::SIDE_BUY)[0].price(), b.side(api::SIDE_SELL)[0].size(),
       b.side(api::SIDE_SELL)[0].price());

   switch (me->status) {
   case Status::WAITING_FOR_MD: {

      auto other_side =
          securities::other_side(algo_services().definition().side());
      auto &other_side_top = me->last_book.side(other_side)[0];

      if (other_side_top.size() > 0) {
         // First time we get the opposite side market, we will use
         // this price as the target price for the join
         me->status = Status::WAITING_TO_JOIN;
         me->book_at_inception = b;
         LOG(OrderMan, Diag,
             "JoinOrder order {} receiving first book: {} Target: {}",
             algo_services().order_key().value(),
             me->min_quantity_to_join.value(), other_side_top.price());
         try_to_join();
      }
   } break;
   case Status::JOINED:
      // Nothing to do at this point
      break;
   case Status::WAITING_TO_JOIN:
      // Check if we reached the target and join if so
      try_to_join();
      break;
   }
}

// Given the current book, our target price and our min quantity
// parameter, join if appropriate
void DynamicJoin::try_to_join() {
   bts_assert(me->status == Status::WAITING_TO_JOIN);

   auto my_side = algo_services().definition().side();
   auto other_side = securities::other_side(my_side);
   auto &my_side_psz = me->last_book.side(my_side)[0];
   auto &opposite_at_inception_psz = me->book_at_inception.side(other_side)[0];

   if (my_side_psz.size() &&
       my_side_psz.size() >= me->min_quantity_to_join.value() &&
       securities::is_equal_or_better(
           my_side, Price(my_side_psz.price()),
           Price(opposite_at_inception_psz.price()))) {

      // Time to update our child to join the market
      Update child_update;
      child_update.set_price(my_side_psz.price());
      child_update.set_quantity(algo_services().quantity().value());
      algo_services().update_child(me->child_key, child_update);
      me->status = Status::JOINED;

      LOG(OrderMan, Diag, "DynamicJoin order {} joined: {}@{}",
          algo_services().order_key().value(),
          algo_services().quantity().value(), my_side_psz.price());

   } else {
      LOG(OrderMan, Diag,
          "JoinOrder order {} still waiting to join. Order: {}@{} Top of "
          "the book: {}@{} Target: {}@{}",
          algo_services().order_key().value(),
          algo_services().quantity().value(), algo_services().price().value(),
          my_side_psz.size(), my_side_psz.price(),
          me->min_quantity_to_join.value(), opposite_at_inception_psz.price());
   }
}

// Our child reported a change:
void DynamicJoin::on_child_change(OrderKey, ChildState const &st) {
   // Our fill quantity and price is always a reflection of our
   // child's state
   algo_services().update_fill_state(st.total_fills, st.last_price);

   switch (st.status) {
   case ChildStatus::WORKING:
      // Child order is still working, nothing to do
      return;
   case ChildStatus::COMPLETED:
      // Child order completed, we should also complete
      algo_services().complete();
      return;
   case ChildStatus::CANCELED:
      // Child order unexpectedly canceled, signal an error and
      // terminate
      algo_services().terminate_error("Unexpected child cancel");
      return;
   }
}

// This is called by the spark daemon at startup. In this call we
// describe our algo's parameters and register a factory function to
// create an instance of the algo.
extern "C" void init_spark_plugin(bts::SparkServices &serv) {
   using namespace bts;

   DynamicOrderDescriptor join_desc;

   // Start by naming our order type
   join_desc.set_dynamic_order_type("dyn join");

   // Now add a single unsigned int parameter, Min Quantity, that will
   // determine the minimum quantity that must exist in the book in
   // order for us to join the market.
   *join_desc.add_param_desc() = param_uint32("Min Quantity");

   // Associate the descriptor with a builder function and register
   // that with the Spark server.
   auto result = 
      serv.register_dynamic_algo_order(
       join_desc, []() { return std::make_unique<DynamicJoin>(); });

   if (result != bts::AlgoRegistrationResult::SUCCESS) {
      // The join_desc has errors
      // Check the error log for more details
   }
}

} // namespace order_example
} // namespace bts

Previous