Многопоточный UDP сервер с Netty
Я пытаюсь реализовать сервер UDP с Netty. Идея состоит в том, чтобы связать только один раз (следовательно, создать только один Channel
). Этот Channel
инициализируется только одним обработчиком, который отправляет обработку входящих датаграмм между несколькими потоками через ExecutorService
.
@Configuration
public class SpringConfig {
@Autowired
private Dispatcher dispatcher;
private String host;
private int port;
@Bean
public Bootstrap bootstrap() throws Exception {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(dispatcher);
ChannelFuture future = bootstrap.bind(host, port).await();
if(!future.isSuccess())
throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());
return bootstrap;
}
}
@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {
private int workerThreads;
private ExecutorService executorService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
final Channel channel = ctx.channel();
executorService.execute(new Runnable() {
@Override
public void run() {
//Process the packet and produce a response packet (below)
DatagramPacket responsePacket = ...;
ChannelFuture future;
try {
future = channel.writeAndFlush(responsePacket).await();
} catch (InterruptedException e) {
return;
}
if(!future.isSuccess())
log.warn("Failed to write response packet.");
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(workerThreads);
}
}
У меня есть следующие вопросы:
- Должен ли
DatagramPacket
, полученный методомchannelRead
классаDispatcher
, быть продублирован перед использованием рабочим потоком? Интересно, если этот пакет будет уничтожен после МетодchannelRead
возвращает значение, даже если ссылка хранится в рабочем потоке. - безопасно ли разделить
Channel
между всеми рабочими потоками и позволить им вызыватьwriteAndFlush
одновременно?
Спасибо!
1 ответ:
- нет. Если вам нужно, чтобы объект жил дольше, вы либо превращаете его во что-то другое, либо используете
ReferenceCountUtil.retain(datagram)
, а затемReferenceCountUtil.release(datagram)
, Как только вы закончите с ним. Вы также не должны делатьawait()
в службе executor, вы должны зарегистрировать обработчик для всего, что происходит.Да, объекты канала потокобезопасны и могут вызываться из множества различных потоков.