Странное поведение Hazelcat IMap#put ()


Моя программа на основе Hazelcast может работать в двух режимах: отправитель и рабочий.

Отправитель помещает некоторое POJO на распределенную карту с помощью некоторого ключа, например: hazelcastInstance.getMap(MAP_NAME).put(key, value);

Рабочий имеет бесконечный цикл (с Thread.sleep(1000L); Внутри для таймаута), который должен обрабатывать объекты из карты. Сейчас я просто печатаю размер карты в этом цикле.

Теперь вот в чем проблема. Я запускаю приложение worker. Затем я запускаю четыре отправителя одновременно (каждый добавляет запись на карту и завершает свою работу). Но после того, как все приложения-отправители завершены, приложение worker печатает произвольный размер: иногда он обнаруживает, что была добавлена только одна запись, иногда две, иногда три (на самом деле он никогда не видел все четыре записи). В чем проблема с этим простым потоком? Я читал в Hazelcast docs, что метод put() является синхронным, поэтому он гарантирует, что после его возврата запись помещается на распределенную карту иреплицируется . Но в моем эксперименте это не так.

UPD (код)

Отправитель:

public void submit(String key) {
    Object mySerializableObject = ...
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}

Рабочий:

public void process() {
    while (true) {
        IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
        System.out.println(map.size());

        // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
        // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
        try {
            Thread.sleep(PAUSE);
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

Я прокомментировал саму часть "обработки", потому что теперь я просто пытаюсь получить согласованное состояние карты. Приведенный выше код каждый раз выводит разные результаты, например: : "4, 3, 1, 1, 1, 1, 1...(таким образом, он может даже увидеть 4 представленных задания на мгновение, но затем они... исчезнуть).

UPD (log)

Рабочий:

...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...

Отправитель 1:

Before: tasksMap.size() = 0
After: tasksMap.size() = 1

Отправитель 2:

Before: tasksMap.size() = 1
After: tasksMap.size() = 4

Отправитель 3:

Before: tasksMap.size() = 1
After: tasksMap.size() = 2

Отправитель 4:

Before: tasksMap.size() = 3
After: tasksMap.size() = 4
1 14

1 ответ:

Ну, я думаю, я понял проблему. Насколько я понимаю, распределенный IMap, возвращаемый hazelcastInstance.getMap, не гарантирует, что данные реплицируются по всем существующим узлам в кластере: некоторые части данных могут быть реплицированы на некоторые узлы, другая часть - на другие узлы. Вот почему в моем примере некоторые из представленных задач были реплицированы не на рабочий узел (который работает постоянно), а на некоторые другие отправители, которые прекращают свое выполнение после отправки. Так что такое записи были потеряны на выходе отправителей.

Я решил эту проблему, заменив hazelcastInstance.getMap на hazelcastInstance.getReplicatedMap. Этот метод возвращает ReplicatedMap, который, AFAIK, гарантирует, что записи, помещенные в него, будут реплицированы на все узлы кластера. Так что теперь в моей системе все работает нормально.