Subscriptions
Using Subscriptions for reactive data
Subscriptions open a persistent event stream to the server. Unlike queries and mutations, a subscription emits multiple values over time and must be consumed reactively via .subscribe().
Subscribe() returns a Subscription object, keeping the GraphQL subscription alive. To unsubscribe, simply call
.Unsubscribe(). When the Subscription is deleted, it is also unsubscribed.
Subscription sub = client.Subscribe({
.query = R"(
subscription OnMessageAdded {
messageAdded {
id
text
author
}
}
)"
}).subscribe(
[](const GraphQLResponse& event) {
std::cout << event.data->dump(2) << "\n";
},
[](std::exception_ptr) { /* handle stream error */ }
);
// Later: cancel the subscription
sub.Unsubscribe();
Choosing a transport
To enable reactive data from subscriptions, it is required to have a stateful connection like WebSocket or Server-Sent Events (SSE).
In gqlxy-client, you can use one or both HttpLink or WsLink. By default, HttpLink will use SSE for subscriptions.
If you want, you can also use both with a SplitLink to route subscription operations to WebSockets.
#include <gqlxy/links/http_link.h>
#include <gqlxy/links/ws_link.h>
#include <gqlxy/links/split_link.h>
auto client = gqlxy::Client({
.link = make_shared<SplitLink>(
[](const GraphQLRequest& req) {
// When true, the request will use the first link.
// Otherwise, it will use the second one.
return req.type != parser::OperationType::SUBSCRIPTION;
},
make_shared<HttpLink>(HttpLinkOptions{
.url = "http://localhost:4000/graphql"
}),
make_shared<WsLink>(WsLinkOptions{
.url = "ws://localhost:4000/graphql"
})
),
.cache = make_shared<InMemoryCache>()
});
Handling completion
When using .subscribe, you can also have a third callback to handle completion.
Subscription sub = client.Subscribe({.query = "subscription { heartbeat }"})
.subscribe(
[](const GraphQLResponse& r) { /* event */ },
[](std::exception_ptr) { /* error */ },
[]() { /* done */ }
);
Cancelling a subscription
To cancel a subscription, simply call .Unsubscribe() on the Subscription object. Otherwise, the subscription will
automatically be canceled when the object gets deleted.
sub.Unsubscribe();
Subscription variables
Like every other request, you can pass variables to your GraphQL subscriptions.
Subscription sub = client.Subscribe({
.query = R"(
subscription OnReview($bookId: ID!) {
reviewAdded(bookId: $bookId) {
rating
}
}
)",
.variables = {
{"bookId", "42"}
}
}).subscribe(...);
Using RPP operators
Observable<GraphQLResponse> converts implicitly to rpp::dynamic_observable<GraphQLResponse>, giving access to the full RPP operator set:
rpp::dynamic_observable<GraphQLResponse> stream = client.Subscribe({
.query = R"(
subscription {
priceUpdated {
symbol
bid
ask
}
}
)"
});
Subscription sub = (stream
| rpp::operators::filter([](const GraphQLResponse& r) { return r.data.has_value(); })
| rpp::operators::map([](const GraphQLResponse& r) { return (*r.data)["priceUpdated"]; }))
.subscribe([](const nlohmann::json& tick) {
std::cout << format("{} {}/{}", tick["symbol"].get<std::string>(), tick["bid"], tick["ask"]) << std::endl;
});
Error handling
Transport errors (e.g. WebSocket disconnect) are delivered through the on_error callback and terminate the observable.
WsLink performs automatic reconnection at the transport level. A brief disconnect will be recovered transparently without
surfacing an error to the subscription unless the retry budget is exhausted.
GraphQL field errors are delivered as a normal GraphQLResponse with the .errors field. The stream remains open and continues
delivering subsequent events.