Почему мой канал Go возвращает один и тот же элемент более одного раза
У меня есть простое приложение, над которым я работаю, чтобы прочитать opLog репликации MongoDB, сериализовать результаты в структуру Go и отправить его в канал для обработки. В настоящее время я читаю с этого канала и просто распечатываю значения внутри структуры.
Я пробовал читать значения из канала, используя for/range, простое чтение непосредственно из него и помещая его в select с таймаутом. Результаты все те же. Каждый раз, когда я запускаю код, я получите разные результаты от канала. Я вижу, что каждый раз канал записывается тоже один раз, однако при чтении с этого канала я иногда считываю одно и то же значение 1-3, иногда даже 4 раза, даже с одной записью.
Обычно это происходит только на начальной нагрузке (вытягивание старых записей) и, кажется, не происходит при чтении живых дополнений к каналу. Есть ли какая-то проблема, когда чтение из канала слишком быстро происходит до того, как элемент удаляется из него? первый раз ее читаешь?
package main
import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)
type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}
func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation
    for {
        for iter.Next(&oper) {
            fmt.Println("n<<", oper.Id)
            Out <- oper
        }
        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}
func main() {
    session, err := mgo.Dial("127.0.0.1")
    if err != nil {
        panic(err)
    }
    defer session.Close()
    c := session.DB("local").C("oplog.rs")
    cOper := make(chan *Operation, 1)
    go Tail(c, cOper)
    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}
2 ответа:
Я думаю, что вы повторно используете свой
*Operation, который вызывает проблемы. Например:Http://play.golang.org/p/_MeSBLWPwN
c := make(chan *int, 1) go func() { val := new(int) for i :=0; i<10; i++ { *val = i c <- val } close(c) }() for val := range c { time.Sleep(time.Millisecond * 1) fmt.Println(*val) }Этот код приводит к следующему:
2 3 4 5 6 7 8 9 9 9И что еще более важно, это не потокобезопасно. Попробуйте сделать это, возможно:
for { for { var oper *Operation if !iter.Next(&oper) { break } fmt.Println("\n<<", oper.Id) Out <- oper } ... }Или используйте простой
Operationвместо*Operation. (Потому что без указателя значение копируется)
Я думаю, что вы каждый раз десериализуетесь в один и тот же экземпляр структуры, поэтому один и тот же объект читается каналом и переписывается отправителем. Попробуйте просто переместить его инициализацию в цикл, чтобы каждый раз создавать новый.
Вы также можете запустить этот код с
go run -raceилиgo build -race, он предупреждает о таких вещах.