Странное поведение Hazelcat IMap#put ()
Моя программа на основе Hazelcast может работать в двух режимах: отправитель и рабочий.
Отправитель помещает некоторое POJO на распределенную карту с помощью некоторого ключа, например: hazelcastInstance.getMap(MAP_NAME).put(key, value);
Рабочий имеет бесконечный цикл (с Thread.sleep(1000L);
Внутри для таймаута), который должен обрабатывать объекты из карты. Сейчас я просто печатаю размер карты в этом цикле.
put()
является синхронным, поэтому он гарантирует, что после его возврата запись помещается на распределенную карту иреплицируется . Но в моем эксперименте это не так.
UPD (код)
Отправитель:
public void submit(String key) {
Object mySerializableObject = ...
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}
Рабочий:
public void process() {
while (true) {
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
System.out.println(map.size());
// Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
// objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
try {
Thread.sleep(PAUSE);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}
Я прокомментировал саму часть "обработки", потому что теперь я просто пытаюсь получить согласованное состояние карты. Приведенный выше код каждый раз выводит разные результаты, например: : "4, 3, 1, 1, 1, 1, 1...(таким образом, он может даже увидеть 4 представленных задания на мгновение, но затем они... исчезнуть).
UPD (log)
Рабочий:
...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...
Отправитель 1:
Before: tasksMap.size() = 0
After: tasksMap.size() = 1
Отправитель 2:
Before: tasksMap.size() = 1
After: tasksMap.size() = 4
Отправитель 3:
Before: tasksMap.size() = 1
After: tasksMap.size() = 2
Отправитель 4:
Before: tasksMap.size() = 3
After: tasksMap.size() = 4
1 ответ:
Ну, я думаю, я понял проблему. Насколько я понимаю, распределенный
Я решил эту проблему, заменивIMap
, возвращаемыйhazelcastInstance.getMap
, не гарантирует, что данные реплицируются по всем существующим узлам в кластере: некоторые части данных могут быть реплицированы на некоторые узлы, другая часть - на другие узлы. Вот почему в моем примере некоторые из представленных задач были реплицированы не на рабочий узел (который работает постоянно), а на некоторые другие отправители, которые прекращают свое выполнение после отправки. Так что такое записи были потеряны на выходе отправителей.hazelcastInstance.getMap
наhazelcastInstance.getReplicatedMap
. Этот метод возвращаетReplicatedMap
, который, AFAIK, гарантирует, что записи, помещенные в него, будут реплицированы на все узлы кластера. Так что теперь в моей системе все работает нормально.