仰起嘴角 2019-12-17
import java.util.LinkedHashMap; import java.util.Map; import java.util.NoSuchElementException; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class TestFindResult { private static final Map<String, String> templates; private static final int sleep = 1000; static { templates = new LinkedHashMap<>(); templates.put("aDB", "a"); templates.put("bDB", "b"); templates.put("cDB", "c"); } public Mono<String> findResult(Function<String, Mono<String>> query) { return Flux.fromIterable(templates.values()) .flatMap(query) .next() .onErrorResume(NoSuchElementException.class, e -> Mono.empty()) .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new); } public static void main(String[] args) { TestFindResult test = new TestFindResult(); Function<String, Mono<String>> query = (value) -> { try { Thread.sleep(sleep); // mock DB query } catch (InterruptedException e) { e.printStackTrace(); } log.info( "Thread id:{}, Thread name:{}, value:{}, used ms:{}", Thread.currentThread().getId(), Thread.currentThread().getName(), value, sleep); return Mono.just(value); }; System.out.println(test.findResult(query).subscribe()); } }
import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class TestFindMongo { private static final Map<String, String> templates; private static final int sleep = 1000; static { templates = new LinkedHashMap<>(); templates.put("aDB", "a"); templates.put("bDB", "b"); templates.put("cDB", "c"); } public Mono<String> findMongo() { StopWatch stopWatch = StopWatch.createStarted(); return Flux.fromIterable(templates.entrySet()) .filterWhen( template -> { String key = template.getKey(); String value = template.getValue(); try { Thread.sleep(sleep); // mock DB query } catch (InterruptedException e) { e.printStackTrace(); } log.info( "Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}", Thread.currentThread().getId(), Thread.currentThread().getName(), key, value, sleep); return Mono.just(value.equals("b")); }) .next() .doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey())) .map(Entry::getValue) .onErrorResume(NoSuchElementException.class, e -> Mono.empty()) .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new) .doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime())); } public static void main(String[] args) { TestFindMongo test = new TestFindMongo(); System.out.println(test.findMongo().subscribe()); } }
import static org.springframework.http.HttpStatus.*; import org.springframework.web.server.ResponseStatusException; import reactor.core.publisher.Flux; public class MultipleUpstreamException extends ResponseStatusException { private static final String MULTILPLE_UPSTREAM_MATCH_ERR = "Your query contains properties matching multiple upstreams. " + "Data for multiple upstreams can‘t be returned in one query. " + "Please either specify upstream by providing publisherSystem " + "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) " + "and region or request deal properties matching only one upstream"; MultipleUpstreamException() { super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR); } /** * This constructor has syntax adapted to Mono API * * @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one * elements * @see Mono#onErrorMap(Class, java.util.function.Function)) */ MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) { this(); } }