Skip to main content
Version: master

Subscriptions

Subscription Queries

Graphql subscriptions allow you subscribe to a reactive source and as new data arrives a graphql query is applied over that data and the results are passed on.

See http://graphql.org/blog/subscriptions-in-graphql-and-relay/ for more general details on graphql subscriptions.

Imagine you have an stock market pricing service and you make a graphql subscription to it like this

subscription StockCodeSubscription {
stockQuotes(stockCode:"IBM") {
dateTime
stockCode
stockPrice
stockPriceChange
}
}

graphql subscriptions allow a stream of ExecutionResult objects to be sent down each time the stock price changes. The field selection set will applied to the underlying data and are represented just like any other graphql query.

What is special is that the initial result of a subscription query is a reactive-streams Publisher object which you need to use to get the future values.

You need to use SubscriptionExecutionStrategy as your execution strategy as it has the support for the reactive-streams APIs.

GraphQL graphQL = GraphQL
.newGraphQL(schema)
.subscriptionExecutionStrategy(new SubscriptionExecutionStrategy())
.build();

ExecutionResult executionResult = graphQL.execute(query);

Publisher<ExecutionResult> stockPriceStream = executionResult.getData();

The Publisher<ExecutionResult> here is the publisher of a stream of events. You need to subscribe to this with your processing code which will look something like the following

GraphQL graphQL = GraphQL
.newGraphQL(schema)
.subscriptionExecutionStrategy(new SubscriptionExecutionStrategy())
.build();

String query = "" +
" subscription StockCodeSubscription {\n" +
" stockQuotes(stockCode:\"IBM\") {\n" +
" dateTime\n" +
" stockCode\n" +
" stockPrice\n" +
" stockPriceChange\n" +
" }\n" +
" }\n";

ExecutionResult executionResult = graphQL.execute(query);

Publisher<ExecutionResult> stockPriceStream = executionResult.getData();

AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
stockPriceStream.subscribe(new Subscriber<ExecutionResult>() {

@Override
public void onSubscribe(Subscription s) {
subscriptionRef.set(s);
s.request(1);
}

@Override
public void onNext(ExecutionResult er) {
//
// process the next stock price
//
processStockPriceChange(er.getData());

//
// ask the publisher for one more item please
//
subscriptionRef.get().request(1);
}

@Override
public void onError(Throwable t) {
//
// The upstream publishing data source has encountered an error
// and the subscription is now terminated. Real production code needs
// to decide on a error handling strategy.
//
}

@Override
public void onComplete() {
//
// the subscription has completed. There is not more data
//
}
});

You are now writing reactive-streams code to consume a series of ExecutionResults. You can see more details on reactive-streams code here http://www.reactive-streams.org/

RxJava is a popular implementation of reactive-streams. Check out http://reactivex.io/intro.html to find out more about creating Publishers of data and Subscriptions to that data.

Project Reactor is another popular implementation of reactive-streams. Check out https://projectreactor.io/ as well.

graphql-java only produces a stream of results. It does not concern itself with sending these over the network on things like web sockets and so on. That is important but not a concern of the base graphql-java library.

We have put together a basic example of using websockets (backed by Jetty) with a simulated stock price application that is built using RxJava.

See https://github.com/graphql-java/graphql-java-subscription-example for more detailed code on handling network concerns and the like.

Subscription Data Fetchers

The DataFetcher behind a subscription field is responsible for creating the Publisher of data. The objects return by this Publisher will be mapped over the graphql query as each arrives and then sent back out as an execution result.

You data fetcher is going to look something like this.

DataFetcher<Publisher<StockInfo>> publisherDataFetcher = new DataFetcher<Publisher<StockInfo>>() {
@Override
public Publisher<StockInfo> get(DataFetchingEnvironment environment) {
String stockCodeArg = environment.getArgument("stockCode");
return buildPublisherForStockCode(stockCodeArg);
}
};

Now the exact details of how you get that stream of events is up to you and your reactive code. graphql-java gives you a way to map the graphql query fields over that stream of objects just like a standard graphql query.