model: Chain completions to avoid race conditions

Otherwise getConfigAsync().thenCompose(x -> setState()) would be unsafe.

This reverts commit a6595a273afd50524cc66765c6bfbdcc34cb12e4.

Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
This commit is contained in:
Samuel Holland 2018-01-09 10:03:06 -06:00
parent daacc06a0d
commit 933a685585
2 changed files with 18 additions and 23 deletions

View File

@ -91,21 +91,24 @@ public class Tunnel extends BaseObservable implements Keyed<String> {
return CompletableFuture.completedFuture(statistics); return CompletableFuture.completedFuture(statistics);
} }
void onConfigChanged(final Config config) { Config onConfigChanged(final Config config) {
this.config = config; this.config = config;
notifyPropertyChanged(BR.config); notifyPropertyChanged(BR.config);
return config;
} }
void onStateChanged(final State state) { State onStateChanged(final State state) {
if (state != State.UP) if (state != State.UP)
onStatisticsChanged(null); onStatisticsChanged(null);
this.state = state; this.state = state;
notifyPropertyChanged(BR.state); notifyPropertyChanged(BR.state);
return state;
} }
void onStatisticsChanged(final Statistics statistics) { Statistics onStatisticsChanged(final Statistics statistics) {
this.statistics = statistics; this.statistics = statistics;
notifyPropertyChanged(BR.statistics); notifyPropertyChanged(BR.statistics);
return statistics;
} }
public CompletionStage<Tunnel> rename(@NonNull final String name) { public CompletionStage<Tunnel> rename(@NonNull final String name) {

View File

@ -110,24 +110,18 @@ public final class TunnelManager extends BaseObservable {
} }
CompletionStage<Config> getTunnelConfig(final Tunnel tunnel) { CompletionStage<Config> getTunnelConfig(final Tunnel tunnel) {
final CompletionStage<Config> completion = return asyncWorker.supplyAsync(() -> configStore.load(tunnel.getName()))
asyncWorker.supplyAsync(() -> configStore.load(tunnel.getName())); .thenApply(tunnel::onConfigChanged);
completion.thenAccept(tunnel::onConfigChanged);
return completion;
} }
CompletionStage<State> getTunnelState(final Tunnel tunnel) { CompletionStage<State> getTunnelState(final Tunnel tunnel) {
final CompletionStage<State> completion = return asyncWorker.supplyAsync(() -> backend.getState(tunnel))
asyncWorker.supplyAsync(() -> backend.getState(tunnel)); .thenApply(tunnel::onStateChanged);
completion.thenAccept(tunnel::onStateChanged);
return completion;
} }
CompletionStage<Statistics> getTunnelStatistics(final Tunnel tunnel) { CompletionStage<Statistics> getTunnelStatistics(final Tunnel tunnel) {
final CompletionStage<Statistics> completion = return asyncWorker.supplyAsync(() -> backend.getStatistics(tunnel))
asyncWorker.supplyAsync(() -> backend.getStatistics(tunnel)); .thenApply(tunnel::onStatisticsChanged);
completion.thenAccept(tunnel::onStatisticsChanged);
return completion;
} }
public ObservableKeyedList<String, Tunnel> getTunnels() { public ObservableKeyedList<String, Tunnel> getTunnels() {
@ -227,23 +221,21 @@ public final class TunnelManager extends BaseObservable {
} }
CompletionStage<Config> setTunnelConfig(final Tunnel tunnel, final Config config) { CompletionStage<Config> setTunnelConfig(final Tunnel tunnel, final Config config) {
final CompletionStage<Config> completion = asyncWorker.supplyAsync(() -> { return asyncWorker.supplyAsync(() -> {
final Config appliedConfig = backend.applyConfig(tunnel, config); final Config appliedConfig = backend.applyConfig(tunnel, config);
return configStore.save(tunnel.getName(), appliedConfig); return configStore.save(tunnel.getName(), appliedConfig);
}); }).thenApply(tunnel::onConfigChanged);
completion.thenAccept(tunnel::onConfigChanged);
return completion;
} }
CompletionStage<State> setTunnelState(final Tunnel tunnel, final State state) { CompletionStage<State> setTunnelState(final Tunnel tunnel, final State state) {
final CompletionStage<State> completion = // Ensure the configuration is loaded before trying to use it.
asyncWorker.supplyAsync(() -> backend.setState(tunnel, state)); return tunnel.getConfigAsync().thenCompose(x ->
completion.whenComplete((newState, e) -> { asyncWorker.supplyAsync(() -> backend.setState(tunnel, state))
).whenComplete((newState, e) -> {
// Ensure onStateChanged is always called (failure or not), and with the correct state. // Ensure onStateChanged is always called (failure or not), and with the correct state.
tunnel.onStateChanged(e == null ? newState : tunnel.getState()); tunnel.onStateChanged(e == null ? newState : tunnel.getState());
if (e == null && newState == State.UP) if (e == null && newState == State.UP)
setLastUsedTunnel(tunnel); setLastUsedTunnel(tunnel);
}); });
return completion;
} }
} }