Skip to content

MarketDataBus

The MarketDataBus is a publish-subscribe router for market data in the Flox engine.
It allows multiple components to independently subscribe to updates for candles, trades, or order books.

Purpose

To decouple market data producers (e.g., connectors) from consumers (e.g., strategies, aggregators), while supporting symbol-specific subscriptions.

Class Definition

class MarketDataBus : public ISubsystem {
public:
  using CandleCallback = std::function<void(SymbolId, const Candle &)>;
  using TradeCallback = std::function<void(const Trade &)>;
  using BookUpdateCallback = std::function<void(const BookUpdate &)>;

  enum class SubscriptionType { Candle, Trade, BookUpdate };

  struct SubscriptionHandle {
    SymbolId symbol;
    SubscriptionType type;
    size_t index;
    bool operator==(const SubscriptionHandle &other) const;
  };

  SubscriptionHandle subscribeToCandles(SymbolId symbol, CandleCallback cb);
  SubscriptionHandle subscribeToTrades(SymbolId symbol, TradeCallback cb);
  SubscriptionHandle subscribeToBookUpdates(SymbolId symbol, BookUpdateCallback cb);

  void unsubscribe(const SubscriptionHandle &handle);

  void onCandle(SymbolId symbol, const Candle &candle);
  void onTrade(const Trade &trade);
  void onBookUpdate(const BookUpdate &update);

  void clear();
  void start() override;
  void stop() override;
};

Responsibilities

  • Manages multiple subscribers per symbol and event type
  • Emits incoming data to appropriate callbacks
  • Supports unsubscribe via SubscriptionHandle
  • Thread-safe via internal mutex

Internal Design

  • Uses a Router struct to hold subscriber lists per symbol
  • Callbacks are stored in std::vector<std::optional<...>> for efficient access and removal
  • Synchronization is handled with std::mutex

Use Cases

  • Strategies subscribing to specific symbols
  • Centralized data distribution for logging, analytics, or aggregation
  • Decoupling modules with dynamic subscription capabilities

Notes

  • Subscriptions are retained until explicitly unsubscribed or clear() is called
  • Subscribers must avoid holding long locks or throwing exceptions inside callbacks