Почему мой канал Go возвращает один и тот же элемент более одного раза


У меня есть простое приложение, над которым я работаю, чтобы прочитать opLog репликации MongoDB, сериализовать результаты в структуру Go и отправить его в канал для обработки. В настоящее время я читаю с этого канала и просто распечатываю значения внутри структуры.

Я пробовал читать значения из канала, используя for/range, простое чтение непосредственно из него и помещая его в select с таймаутом. Результаты все те же. Каждый раз, когда я запускаю код, я получите разные результаты от канала. Я вижу, что каждый раз канал записывается тоже один раз, однако при чтении с этого канала я иногда считываю одно и то же значение 1-3, иногда даже 4 раза, даже с одной записью.

Обычно это происходит только на начальной нагрузке (вытягивание старых записей) и, кажется, не происходит при чтении живых дополнений к каналу. Есть ли какая-то проблема, когда чтение из канала слишком быстро происходит до того, как элемент удаляется из него? первый раз ее читаешь?

package main

import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation

    for {
        for iter.Next(&oper) {
            fmt.Println("n<<", oper.Id)
            Out <- oper
        }

        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func main() {
    session, err := mgo.Dial("127.0.0.1")

    if err != nil {
        panic(err)
    }
    defer session.Close()

    c := session.DB("local").C("oplog.rs")

    cOper := make(chan *Operation, 1)

    go Tail(c, cOper)

    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}
2 3

2 ответа:

Я думаю, что вы повторно используете свой *Operation, который вызывает проблемы. Например:

Http://play.golang.org/p/_MeSBLWPwN

c := make(chan *int, 1)

go func() {
    val := new(int)
    for i :=0; i<10; i++ {
        *val = i
        c <- val
    }
    close(c)
}()


for val := range c {
    time.Sleep(time.Millisecond * 1)
    fmt.Println(*val)
}

Этот код приводит к следующему:

2
3
4
5
6
7
8
9
9
9

И что еще более важно, это не потокобезопасно. Попробуйте сделать это, возможно:

for {
    for { 
        var oper *Operation
        if !iter.Next(&oper) {
            break
        }
        fmt.Println("\n<<", oper.Id)
        Out <- oper
    }
    ...
}

Или используйте простой Operation вместо *Operation. (Потому что без указателя значение копируется)

Я думаю, что вы каждый раз десериализуетесь в один и тот же экземпляр структуры, поэтому один и тот же объект читается каналом и переписывается отправителем. Попробуйте просто переместить его инициализацию в цикл, чтобы каждый раз создавать новый.

Вы также можете запустить этот код с go run -race или go build -race, он предупреждает о таких вещах.