Another very interesting feature that will most likely be available with Java 21 is the introduction of the Structured Concurrency API developed under JEP 428 by Project Loom. Structured concurrency will greatly simplify the use of virtual threads and provide an API similar to the well know ExecutorService API to execute Callables. If you haven’t heard about virtual threads yet, check out this blog post which explains the pros and cons of virtual vs platform threads. Structured Concurrency is currently available as an incubator feature in Java 19.
The centerpiece of the new API is the class StructuredTaskScope, which is used to start a virtual thread with a given Callable to execute. In exchange you get a Future that provides access to the result of the Callable after it finished. Here is simple example of StructuredTaskScope in action:
try (var scope = new StructuredTaskScope<String>()){
Callable<String> callable = () -> "Running ... Finished!";
Future<String> result = scope.fork(callable);
scope.join();
System.out.println(result.get());
}
In line (1) I create a new StructuredTaskScope in a try-with-resource block. StructuredTaskScope implements the interface AutoClosable and we can just conveniently close it like this after use. In line (3) and (5), I create a Callable and start it with the fork method of StructuredTaskScope. Finally, I wait for the Callable to finish in line (7) by invoking the join method. The code blocks in line (7) until the Callable is completed and then resumes by printing the result into the console.
On short note on running the above code yourself. Since structured concurrency is an incubator feature, you won’t be able to just run it with Java 19 installed. You will need to explicitly activate preview features and the concurrency incubator feature when you run your program:
java --enable-preview --add-modules=jdk.incubator.concurrent a.b.ExampleHow to make a Maven project that uses preview features to compile your code can be found here.
Let’s expand our code a little bit to see multiple virtual threads running at the same time. As a simple business case, I want to call a service that returns currency conversion rates from Euros into a bunch of other currencies. The service has a slight delay build in to simulate a slow operation to query the exchange rate, e.g. a web service call or database access. Since I want to keep this example simple, I will just stop the thread for 500 milliseconds instead of implementing a real web service or database access. Here is the code for the currency conversion rate service:
public class CurrencyConversionService {
public Double getConvertionRate(Currency currency) throws Exception {
Thread.sleep(500l);
return currency.getRate();
}
public ConversionResult getEuroConvertionResult(Double amount, Currency target) throws Exception {
return new
ConversionResult(
Double.valueOf(amount*getConvertionRate(target)),
target,
amount);
}
public enum Currency {
USD(1.2d) , EUR(1d) , JPY(144d) , CAD(1.44d) , GBP(0.88d);
private Double rate;
private Currency(Double rate){ this.rate = rate;}
public Double getRate(){ return rate; }
};
}The supported currencies including the rates are hardcoded in the Currency enumeration to keep things simple. There are two business methods in this service, one that returns the conversion rate (getConvertionRate) and the second that converts a given amount in Euros to another currency (getEuroConvertionResult). The second method returns a ConversionResult object, which is a simple record that stores the result of the conversion, the currency and the original amount in Euros. We will use the second method later in this post.
I will now expand the first example, so that the service class is called for all available conversion rates in a virtual thread and collect the results into a HashMap.
CurrencyConversionService currencyService = new CurrencyConversionService();
try(var scope = new StructuredTaskScope<Double>()){
Map<Currency,Future<Double>> results = new HashMap<>();
results.put(USD, scope.fork( () -> currencyService.getConvertionRate(USD) ));
results.put(JPY, scope.fork( () -> currencyService.getConvertionRate(JPY) ));
results.put(EUR, scope.fork( () -> currencyService.getConvertionRate(EUR) ));
results.put(CAD, scope.fork( () -> currencyService.getConvertionRate(CAD) ));
results.put(GBP, scope.fork( () -> currencyService.getConvertionRate(GBP) ));
scope.join();
for(var currency : results.keySet()) {
System.out.println("Conversion rate for "+ currency + " is " +
results.get(currency).get() );
}
}The main difference to our first example is, that I am creating five Callables and start each with the fork method of the StructuredTaskScope in line (7) to (11). The, I call the join on the StructuredTaskScope to wait until all tasks are done in line (13). All Futures are stored in a HashMap and after all tasks finished, I can print the results to the console with the the currency as the key and the result as the entry.
Now you might think why this code is so special compared to the use of a regular ExecutorService to start Callables. First thing to mention is, that creating a ExecutorService is a very costly operation and ExecutorServices are usually created as a singleton. So creating it locally like in line (3) is a no-go looking at performance. The second point is, that using an ExecutorService to start a couple of blocking operations is also not the greatest idea, since the amount of platform threads you can have at a any given time is quite limited. Each platform thread has its on stack trace in memory and also needs some memory to do its job. The wise people that write Apache Tomcat picked 200 threads as the default for a Tomcat installation and this is actually a good number in many cases. If you run the code above on Apache Tomcat with an ExecutorService instead of a StructuredTaskScope, the server would become unresponsive if you started the code more than 40 times.
However, by using StructuredTaskScope and virtual threads, the server will never become unresponsive because it runs out of threads. Virtual threads are very lightweight compared to platform threads and blocking them is quite cheap, since the underlying platform thread that executes the virtual thread will simply work on another virtual thread in the meantime. You can easily start a million virtual threads without breaking your application. This is the beauty when using virtual threads to execute business logic concurrently. You don’t need to think about shared thread pools and ExecutorServices anymore. You simply create as many virtual threads as you need by a local StructuredTaskScope and your application will run just fine. No more worrying about thread starvation because the virtual thread will make sure that your platform threads can be as productive as possible without you optimizing it.
Subclassing StructuredTaskScope
One thing that could be improved in the code of the last example, is the use of a HashMap to store the results of our Callables. It would be much nicer, if we could simply start all Callables and get the results back from the StructuredTaskScope and the API has a nice callback to achieve that. You can override the handleComplete method which gets called when a Callable completed and is passed the corresponding Future. In this method you can manipulate the result or store it somewhere for easy access.
Subclassing StructuredTaskScope will look like this:
public class CurrencyConverterScope extends StructuredTaskScope<ConversionResult> {
Queue<ConversionResult> queue = new ConcurrentLinkedQueue<ConversionResult>();
@Override
protected void handleComplete(Future<ConversionResult> future) {
if (future.isDone()){
try {
queue.add(future.get());
} catch (Exception e) {
System.out.println("Exception converting result.");
}
}
}
public Queue<ConversionResult> getResults(){ return queue;}
}In line (1), I create a subclass of a StructuredTaskScope to later override the handleComplete method. As the generic, we use the specialized record ConversionResult, which gives us a way to store the result in a structured way. In line (3), I define a ConcurrentLinkedQueue to later store the result in. This needs to be a concurrent data structure, because some Callables might complete at the same time. If you used a regular ArrayList here, you might get the infamous ConcurrentModificationException if two threads inserted an element into the list at the same time. From line (5) to (14) I override the handleComplete method and put the result into the queue. Finally, there is a getter for the results queue in line (16).
Let’s modify our example to use the CurrencyConverterScope and the ConversionResult. I will also use the second business method of our converter service, which already uses the ConversionResult.
try(var scope = new CurrencyConverterScope()){
scope.fork( () -> currencyService.getEuroConvertionResult(100d,USD) );
scope.fork( () -> currencyService.getEuroConvertionResult(100d,JPY) );
scope.fork( () -> currencyService.getEuroConvertionResult(100d,EUR) );
scope.fork( () -> currencyService.getEuroConvertionResult(100d,CAD) );
scope.fork( () -> currencyService.getEuroConvertionResult(100d,GBP) );
scope.join();
for(var result : scope.getResults()) {
System.out.println(result);
}
}The code looks much cleaner like this. We use the second method of our conversion service to store the result and input parameters in a ConversionResult, which are later returned by our Futures. All ConversionResults are available by the getResults method of our custom scope. This way we encapsulated all logic concerning the threading and result processing inside the CurrencyConverterScope.
Conclusion
That’s all I want to cover about structured concurrency for today. I think it will provide a nice API to write easy to understand and clean code once it is fully released. It will make concurrent coding a lot more accessible and many drawbacks of the old platform thread model are simply resolved in the background for you when using virtual threads. There are also quite nice callback methods you can override to encapsulate your result processing and provide results in a convenient way.
If you want to check out the code examples, you can find them as always in my github repository.
Schreibe einen Kommentar