Многопоточный UDP сервер с Netty


Я пытаюсь реализовать сервер UDP с Netty. Идея состоит в том, чтобы связать только один раз (следовательно, создать только один Channel). Этот Channel инициализируется только одним обработчиком, который отправляет обработку входящих датаграмм между несколькими потоками через ExecutorService.

@Configuration
public class SpringConfig {

    @Autowired
    private Dispatcher dispatcher;

    private String host;

    private int port;

    @Bean
    public Bootstrap bootstrap() throws Exception {
        Bootstrap bootstrap = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(dispatcher);

        ChannelFuture future = bootstrap.bind(host, port).await();
        if(!future.isSuccess())
            throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());

        return bootstrap;
    }
}

@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {

    private int workerThreads;

    private ExecutorService executorService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DatagramPacket packet = (DatagramPacket) msg;

        final Channel channel = ctx.channel();

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //Process the packet and produce a response packet (below)              
                DatagramPacket responsePacket = ...;

                ChannelFuture future;
                try {
                    future = channel.writeAndFlush(responsePacket).await();
                } catch (InterruptedException e) {
                    return;
                }
                if(!future.isSuccess())
                    log.warn("Failed to write response packet.");
            }
        });
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        executorService = Executors.newFixedThreadPool(workerThreads);
    }
}

У меня есть следующие вопросы:

  1. Должен ли DatagramPacket, полученный методом channelRead класса Dispatcher, быть продублирован перед использованием рабочим потоком? Интересно, если этот пакет будет уничтожен после Метод channelRead возвращает значение, даже если ссылка хранится в рабочем потоке.
  2. безопасно ли разделить Channel между всеми рабочими потоками и позволить им вызывать writeAndFlush одновременно?

Спасибо!

1 3

1 ответ:

  1. - нет. Если вам нужно, чтобы объект жил дольше, вы либо превращаете его во что-то другое, либо используете ReferenceCountUtil.retain(datagram), а затем ReferenceCountUtil.release(datagram), Как только вы закончите с ним. Вы также не должны делать await() в службе executor, вы должны зарегистрировать обработчик для всего, что происходит.

  2. Да, объекты канала потокобезопасны и могут вызываться из множества различных потоков.