ROS 2 Subscription Architecture
Background
The gateway’s original NativeTopicSampler created a short-lived
rclcpp::GenericSubscription for every /data sample call. Under concurrent
HTTP load, multiple cpp-httplib handler threads invoked
Node::create_generic_subscription() on the same node, racing inside rcl’s
internal hash map and corrupting its linked entries. The symptom was a Rolling
SIGSEGV in test_data_read (issue #375) and intermittent non-deterministic
crashes on sample-heavy deployments.
This document describes the replacement architecture: a single-writer subscription executor, a RAII slot handle, and a per-topic pool.
Goals
Eliminate the rcl mutation race by serializing all subscription / callback group creation on one worker thread.
Amortize subscription cost for hot topics - subscribe once per topic, serve many samples from the cached latest message.
Stay embedded-friendly: no new threads beyond the serial worker, bounded queues and pools, deterministic shutdown bounded by the longest in-flight task.
Preserve SOVD REST contract; the changes are entirely below the handler layer.
Layering
+-----------------------------+ +----------------------------+
| HTTP handlers (data, etc.) | | DiscoveryManager, etc. |
+-----------------------------+ +----------------------------+
| TopicDataProvider& (pure C++ interface, no rclcpp)
v
+----------------------------------------------------------------+
| Ros2TopicDataProvider (data domain, ROS 2-specific) |
| per-topic pool, LRU, idle sweep, graph-change eviction |
+----------------------------------------------------------------+
| Ros2SubscriptionSlot (RAII handle)
v
+----------------------------------------------------------------+
| Ros2SubscriptionExecutor (ros2_common, shared infra) |
| one worker thread, bounded queue, watchdog, graph polling |
+----------------------------------------------------------------+
| rclcpp::Node (dedicated "_sub" subscription node)
v
+----------------------------------------------------------------+
| rclcpp / rcl |
+----------------------------------------------------------------+
Handlers and business managers depend only on
TopicDataProvider(include/ros2_medkit_gateway/data/topic_data_provider.hpp). No#include <rclcpp/...>is required to consume the interface.Ros2TopicDataProvideris the statically-linked default implementation. Alternate transports (Zenoh, XRCE-DDS, mocks) can plug in with a different class implementing the same interface.
Ros2SubscriptionExecutor
Owns:
One dedicated
std::thread(the worker).One
rclcpp::Node(the subscription node, suffixed_sub) exclusively owned by this executor’s internalSingleThreadedExecutor. The gateway’s mainMultiThreadedExecutornever sees this node; creation, destruction and callback dispatch of every subscription on it run on the single worker thread, preserving the single-writer invariant against rcl’s hash-map.A bounded task queue guarded by
queue_mtx_+queue_cv_.One
auxthread driving the watchdog and graph-event polling ticks on their own cadence. The aux thread only touches atomics and the subscription node’s graph event; it never mutates the node’s subscription list so it cannot race the worker’s create / destroy path.
Public API (summary):
run_sync<R>(std::function<R()>, deadline)- execute on the worker, wait with deadline, returntl::expected<R, string>.post(std::function<void()>)- fire-and-forget. Returnsfalsewhen the queue is at capacity (backpressure) or the executor is shutting down.on_graph_change(cb)/remove_graph_change(token)- register / deregister callbacks that fire on the worker when the ROS 2 graph changes. Pre-allocated slot array (16 slots, Tier 1 memory guarantee).is_shutting_down()/stats()- observability.
Graph change detection uses the public rclcpp::Node::get_graph_event() API
and a wall timer polling event->check_and_clear() on the subscription
node. Internal GraphListener is intentionally avoided - it is not a stable
public API across Humble / Jazzy / Rolling.
Shutdown drains the queue: the worker processes any already-enqueued tasks so
pending run_sync promises are fulfilled before the worker exits. Bounded
by the longest in-flight task (individual run_sync deadlines cap worst
case).
Ros2SubscriptionSlot
RAII handle returned by create_typed<MessageT> or create_generic. On
destruction:
If the executor has entered shutdown (
is_shutting_down()), the slot releases itsSubscriptionBase::SharedPtron the current thread. The subscription node’srcl_node_finiduring executor teardown cleans up the rcl handle atomically. Norun_synccall, no hang.Otherwise, the slot posts a destroy task via
run_syncwith a 30 s deadline. On timeout, the slot logs an ERROR and releases on the current thread anyway - we prefer a narrow rcl race risk over hanging gateway shutdown.
release_without_executor is not exposed publicly. The destructor makes the
decision internally; shrinking the public surface is intentional for future
safety analysis.
Ros2TopicDataProvider (pool)
Per-entry structure:
struct PoolEntry {
std::string topic;
std::string cached_type;
std::unique_ptr<Ros2SubscriptionSlot> slot;
std::mutex buf_mtx; // protects latest, latest_ns
std::condition_variable buf_cv;
std::optional<rclcpp::SerializedMessage> latest;
std::int64_t latest_ns;
std::chrono::steady_clock::time_point last_sample_time;
std::atomic<bool> shutdown;
};
sample(topic, timeout) flow:
Graph query for publisher count / type. If no publishers, return metadata-only immediately.
Pool lookup under
pool_mtx_. Hit: promote the topic to the MRU end of the LRU list and return the shared entry.Miss: build the entry shell, create the slot via
Ros2SubscriptionSlot::create_generic(this goes through the executor’srun_sync), then insert underpool_mtx_. If the pool is at cap, evict the LRU entry first.Lock
entry->buf_mtx. Iflatestpresent, deserialize and return. If cold andconcurrent_cold_waits_ >= cold_wait_cap, returntl::unexpectedwithERR_X_MEDKIT_COLD_WAIT_CAP_EXCEEDED(HTTP 503, retryable with backoff). Otherwise wait onbuf_cvup totimeout.Deserialize the latest message via
JsonSerializerand return.
The callback registered with the slot captures a
std::weak_ptr<PoolEntry>. If the entry is evicted, the callback no-ops
safely rather than touching freed memory.
Eviction
Four independent eviction triggers keep the pool size bounded:
Graph change - topics that disappeared from the graph or whose type changed are evicted on the next graph event. Worker-scheduled callback.
LRU - at
max_pool_size, a new miss evicts the least-recently-used entry.Idle safety net - a wall timer (every
idle_sweep_tick, default 1 min) drops entries whoselast_sample_timeis older thanidle_safety_net(default 15 min).Shutdown - the provider destructor signals every per-entry CV, drops the pool map + LRU structures, then releases each slot.
All four paths release slots outside pool_mtx_ so the slot destructor’s
run_sync can not deadlock against a sample thread waiting on the pool
mutex.
Cold-wait cap
cpp-httplib serves requests on a bounded worker pool (typically 4-8 threads).
Without protection, N concurrent cold-topic samples can saturate that pool and
starve /health, /components, and other endpoints.
concurrent_cold_waits_ tracks how many sample calls are currently blocked
on a cold entry’s CV. When the count reaches cold_wait_cap (default 4),
further cold callers fail fast with ERR_X_MEDKIT_COLD_WAIT_CAP_EXCEEDED
(HTTP 503, params.cold_wait_cap set, retry with backoff). Existing waiters
complete normally; warm topics are unaffected. Returning a structured error
instead of a metadata-only success lets clients distinguish back-pressure
(“retry me”) from “no publishers yet” (“nothing to wait for”).
QoS matching
The pool queries the publisher QoS via
get_publishers_info_by_topic on each miss and derives a subscriber QoS
that is compatible:
Reliable if any publisher is Reliable, else BestEffort.
TransientLocal if any publisher is TransientLocal, else Volatile. (The latter is important for latched
statustopics - a volatile subscriber would miss the latched last value.)keep_lastdepth 1 always. The pool only keeps the newest serialized message; deeper queues would just allocate and discard.
Main assembly
main.cpp constructs the chain after the gateway node is added to the
executor:
auto node = std::make_shared<GatewayNode>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
auto sub_exec = std::make_shared<Ros2SubscriptionExecutor>(node);
auto serializer = std::make_shared<JsonSerializer>();
auto data_provider = std::make_shared<Ros2TopicDataProvider>(sub_exec, serializer);
node->set_topic_data_provider(data_provider);
executor.spin();
data_provider.reset(); // drops pool entries through sub_exec
sub_exec.reset(); // joins worker + aux, destroys subscription node
executor.remove_node(node); // detach before ~GatewayNode runs
node.reset(); // ~GatewayNode with executor still alive
rclcpp::shutdown();
remove_node + explicit node.reset() are required on rolling / newer
jazzy: rclcpp aborts with Node ... needs to be associated with an executor
if ~GatewayNode touches a service client while stack-unwind has already
destroyed the executor.
Regression gate
scripts/check_no_naked_subscriptions.sh fails CI if any .cpp or
.hpp outside ros2_common/ or test/ introduces a new call to
create_generic_subscription, create_subscription<, or
create_callback_group. New production code must route through
Ros2SubscriptionSlot.
Known-legacy call sites (fault subscribers, operation manager, log manager, trigger_topic_subscriber) are explicitly allowlisted and tracked for migration under Phase C of the provider architecture rollout.
Observability
GET /health includes two vendor-extension sections populated from atomic
reads only (never blocks):
x-medkit-subscription-executor- worker_alive, degraded, queue depth / dropped, tasks completed / failed, latency, watchdog trips.x-medkit-data-provider- pool_size / cap, pool hits / misses, evictions, graph_events_received, concurrent_cold_waits.
External monitors (k8s liveness probe, Docker HEALTHCHECK, systemd watchdog)
trigger on x-medkit-subscription-executor.degraded without deployment-
specific assumptions.