Как правильно закрыть переменное количество потоков?


Я создаю несколько потоков, к которым я должен обращаться параллельно (или, возможно,-параллельно). Я знаю, как сделать попытку с ресурсами, когда количество ресурсов фиксировано во время компиляции, но что делать, если количество ресурсов определяется параметром?

У меня есть что-то вроде этого:

private static void foo(String path, String... files) throws IOException {
    @SuppressWarnings("unchecked")
    Stream<String>[] streams = new Stream[files.length];

    try {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams[i] = Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file));
        }

        // do something with streams
        Stream.of(streams)
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
    finally {
        for (Stream<String> s : streams) {
            if (s != null) {
                s.close();
            }
        }
    }
}
2 5

2 ответа:

Можно написать композит AutoCloseable для управления динамическим количеством AutoCloseable:

import java.util.ArrayList;
import java.util.List;

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable {
    private final List<T> components= new ArrayList<>();

    public void addComponent(T component) { components.add(component); }

    public List<T> getComponents() { return components; }

    @Override
    public void close() throws Exception {
        Exception e = null;
        for (T component : components) {
            try { component.close(); }
            catch (Exception closeException) {
                if (e == null) { e = closeException; }
                else { e.addSuppressed(closeException); }
            }
        }
        if (e != null) { throw e; }
    }
}

И вы можете использовать его в своем методе:

private static void foo(String path, String... files) throws Exception {
    try (CompositeAutoclosable<Stream<String>> streams 
            = new CompositeAutoclosable<Stream<String>>()) {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams.addComponent(Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file)));
        }
        streams.getComponents().stream()
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
}

Документация Stream.flatMap говорит:

Каждый отображенный поток закрывается после того, как его содержимое было помещено в этот поток.

Другими словами, для обычного закрытия потоков не требуется никаких дополнительных действий. Однако, поскольку закрыты только обработанные потоки, вы не должны создавать потоки нетерпеливо, не зная, будут ли они позже обработаны потоком:
private static void foo(String path, String... files) throws IOException {
    Arrays.stream(files).flatMap(file-> {
              try { return Files.lines(Paths.get(path, file))
                    .onClose(() -> System.out.println("Closed " + file)); }
              catch(IOException ex) { throw new UncheckedIOException(ex); } })
          .parallel()
          .distinct()
          .sorted()
          .limit(10)
          .forEachOrdered(System.out::println);
}

Создавая подпотоки внутри flatMap, это гарантировано что каждый из них создается только в том случае, если поток будет его обрабатывать. Таким образом, это решение закроет все подпотоки даже без наличия внешнего Stream внутри оператора try-with-resource.