Почему мой канал Go возвращает один и тот же элемент более одного раза
У меня есть простое приложение, над которым я работаю, чтобы прочитать opLog репликации MongoDB, сериализовать результаты в структуру Go и отправить его в канал для обработки. В настоящее время я читаю с этого канала и просто распечатываю значения внутри структуры.
Я пробовал читать значения из канала, используя for/range, простое чтение непосредственно из него и помещая его в select с таймаутом. Результаты все те же. Каждый раз, когда я запускаю код, я получите разные результаты от канала. Я вижу, что каждый раз канал записывается тоже один раз, однако при чтении с этого канала я иногда считываю одно и то же значение 1-3, иногда даже 4 раза, даже с одной записью.
Обычно это происходит только на начальной нагрузке (вытягивание старых записей) и, кажется, не происходит при чтении живых дополнений к каналу. Есть ли какая-то проблема, когда чтение из канала слишком быстро происходит до того, как элемент удаляется из него? первый раз ее читаешь?
package main
import (
"fmt"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
)
type Operation struct {
Id int64 `bson:"h" json:"id"`
Operator string `bson:"op" json:"operator"`
Namespace string `bson:"ns" json:"namespace"`
Select bson.M `bson:"o" json:"select"`
Update bson.M `bson:"o2" json:"update"`
Timestamp int64 `bson:"ts" json:"timestamp"`
}
func Tail(collection *mgo.Collection, Out chan<- *Operation) {
iter := collection.Find(nil).Tail(-1)
var oper *Operation
for {
for iter.Next(&oper) {
fmt.Println("n<<", oper.Id)
Out <- oper
}
if err := iter.Close(); err != nil {
fmt.Println(err)
return
}
}
}
func main() {
session, err := mgo.Dial("127.0.0.1")
if err != nil {
panic(err)
}
defer session.Close()
c := session.DB("local").C("oplog.rs")
cOper := make(chan *Operation, 1)
go Tail(c, cOper)
for operation := range cOper {
fmt.Println()
fmt.Println("Id: ", operation.Id)
fmt.Println("Operator: ", operation.Operator)
fmt.Println("Namespace: ", operation.Namespace)
fmt.Println("Select: ", operation.Select)
fmt.Println("Update: ", operation.Update)
fmt.Println("Timestamp: ", operation.Timestamp)
}
}
2 ответа:
Я думаю, что вы повторно используете свой
*Operation
, который вызывает проблемы. Например:Http://play.golang.org/p/_MeSBLWPwN
c := make(chan *int, 1) go func() { val := new(int) for i :=0; i<10; i++ { *val = i c <- val } close(c) }() for val := range c { time.Sleep(time.Millisecond * 1) fmt.Println(*val) }
Этот код приводит к следующему:
2 3 4 5 6 7 8 9 9 9
И что еще более важно, это не потокобезопасно. Попробуйте сделать это, возможно:
for { for { var oper *Operation if !iter.Next(&oper) { break } fmt.Println("\n<<", oper.Id) Out <- oper } ... }
Или используйте простой
Operation
вместо*Operation
. (Потому что без указателя значение копируется)
Я думаю, что вы каждый раз десериализуетесь в один и тот же экземпляр структуры, поэтому один и тот же объект читается каналом и переписывается отправителем. Попробуйте просто переместить его инициализацию в цикл, чтобы каждый раз создавать новый.
Вы также можете запустить этот код с
go run -race
илиgo build -race
, он предупреждает о таких вещах.