af92e38566
From-SVN: r183540
339 lines
8.0 KiB
Go
339 lines
8.0 KiB
Go
// Copyright 2010 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package netchan
|
|
|
|
import (
|
|
"encoding/gob"
|
|
"errors"
|
|
"io"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// The direction of a connection from the client's perspective.
|
|
type Dir int
|
|
|
|
const (
|
|
Recv Dir = iota
|
|
Send
|
|
)
|
|
|
|
func (dir Dir) String() string {
|
|
switch dir {
|
|
case Recv:
|
|
return "Recv"
|
|
case Send:
|
|
return "Send"
|
|
}
|
|
return "???"
|
|
}
|
|
|
|
// Payload types
|
|
const (
|
|
payRequest = iota // request structure follows
|
|
payError // error structure follows
|
|
payData // user payload follows
|
|
payAck // acknowledgement; no payload
|
|
payClosed // channel is now closed
|
|
payAckSend // payload has been delivered.
|
|
)
|
|
|
|
// A header is sent as a prefix to every transmission. It will be followed by
|
|
// a request structure, an error structure, or an arbitrary user payload structure.
|
|
type header struct {
|
|
Id int
|
|
PayloadType int
|
|
SeqNum int64
|
|
}
|
|
|
|
// Sent with a header once per channel from importer to exporter to report
|
|
// that it wants to bind to a channel with the specified direction for count
|
|
// messages, with space for size buffered values. If count is -1, it means unlimited.
|
|
type request struct {
|
|
Name string
|
|
Count int64
|
|
Size int
|
|
Dir Dir
|
|
}
|
|
|
|
// Sent with a header to report an error.
|
|
type error_ struct {
|
|
Error string
|
|
}
|
|
|
|
// Used to unify management of acknowledgements for import and export.
|
|
type unackedCounter interface {
|
|
unackedCount() int64
|
|
ack() int64
|
|
seq() int64
|
|
}
|
|
|
|
// A channel and its direction.
|
|
type chanDir struct {
|
|
ch reflect.Value
|
|
dir Dir
|
|
}
|
|
|
|
// clientSet contains the objects and methods needed for tracking
|
|
// clients of an exporter and draining outstanding messages.
|
|
type clientSet struct {
|
|
mu sync.Mutex // protects access to channel and client maps
|
|
names map[string]*chanDir
|
|
clients map[unackedCounter]bool
|
|
}
|
|
|
|
// Mutex-protected encoder and decoder pair.
|
|
type encDec struct {
|
|
decLock sync.Mutex
|
|
dec *gob.Decoder
|
|
encLock sync.Mutex
|
|
enc *gob.Encoder
|
|
}
|
|
|
|
func newEncDec(conn io.ReadWriter) *encDec {
|
|
return &encDec{
|
|
dec: gob.NewDecoder(conn),
|
|
enc: gob.NewEncoder(conn),
|
|
}
|
|
}
|
|
|
|
// Decode an item from the connection.
|
|
func (ed *encDec) decode(value reflect.Value) error {
|
|
ed.decLock.Lock()
|
|
err := ed.dec.DecodeValue(value)
|
|
if err != nil {
|
|
// TODO: tear down connection?
|
|
}
|
|
ed.decLock.Unlock()
|
|
return err
|
|
}
|
|
|
|
// Encode a header and payload onto the connection.
|
|
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
|
|
ed.encLock.Lock()
|
|
hdr.PayloadType = payloadType
|
|
err := ed.enc.Encode(hdr)
|
|
if err == nil {
|
|
if payload != nil {
|
|
err = ed.enc.Encode(payload)
|
|
}
|
|
}
|
|
if err != nil {
|
|
// TODO: tear down connection if there is an error?
|
|
}
|
|
ed.encLock.Unlock()
|
|
return err
|
|
}
|
|
|
|
// See the comment for Exporter.Drain.
|
|
func (cs *clientSet) drain(timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
for {
|
|
pending := false
|
|
cs.mu.Lock()
|
|
// Any messages waiting for a client?
|
|
for _, chDir := range cs.names {
|
|
if chDir.ch.Len() > 0 {
|
|
pending = true
|
|
}
|
|
}
|
|
// Any unacknowledged messages?
|
|
for client := range cs.clients {
|
|
n := client.unackedCount()
|
|
if n > 0 { // Check for > rather than != just to be safe.
|
|
pending = true
|
|
break
|
|
}
|
|
}
|
|
cs.mu.Unlock()
|
|
if !pending {
|
|
break
|
|
}
|
|
if timeout > 0 && time.Now().After(deadline) {
|
|
return errors.New("timeout")
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// See the comment for Exporter.Sync.
|
|
func (cs *clientSet) sync(timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
// seq remembers the clients and their seqNum at point of entry.
|
|
seq := make(map[unackedCounter]int64)
|
|
cs.mu.Lock()
|
|
for client := range cs.clients {
|
|
seq[client] = client.seq()
|
|
}
|
|
cs.mu.Unlock()
|
|
for {
|
|
pending := false
|
|
cs.mu.Lock()
|
|
// Any unacknowledged messages? Look only at clients that existed
|
|
// when we started and are still in this client set.
|
|
for client := range seq {
|
|
if _, ok := cs.clients[client]; ok {
|
|
if client.ack() < seq[client] {
|
|
pending = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
cs.mu.Unlock()
|
|
if !pending {
|
|
break
|
|
}
|
|
if timeout > 0 && time.Now().After(deadline) {
|
|
return errors.New("timeout")
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// A netChan represents a channel imported or exported
|
|
// on a single connection. Flow is controlled by the receiving
|
|
// side by sending payAckSend messages when values
|
|
// are delivered into the local channel.
|
|
type netChan struct {
|
|
*chanDir
|
|
name string
|
|
id int
|
|
size int // buffer size of channel.
|
|
closed bool
|
|
|
|
// sender-specific state
|
|
ackCh chan bool // buffered with space for all the acks we need
|
|
space int // available space.
|
|
|
|
// receiver-specific state
|
|
sendCh chan reflect.Value // buffered channel of values received from other end.
|
|
ed *encDec // so that we can send acks.
|
|
count int64 // number of values still to receive.
|
|
}
|
|
|
|
// Create a new netChan with the given name (only used for
|
|
// messages), id, direction, buffer size, and count.
|
|
// The connection to the other side is represented by ed.
|
|
func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
|
|
c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
|
|
if c.dir == Send {
|
|
c.ackCh = make(chan bool, size)
|
|
c.space = size
|
|
}
|
|
return c
|
|
}
|
|
|
|
// Close the channel.
|
|
func (nch *netChan) close() {
|
|
if nch.closed {
|
|
return
|
|
}
|
|
if nch.dir == Recv {
|
|
if nch.sendCh != nil {
|
|
// If the sender goroutine is active, close the channel to it.
|
|
// It will close nch.ch when it can.
|
|
close(nch.sendCh)
|
|
} else {
|
|
nch.ch.Close()
|
|
}
|
|
} else {
|
|
nch.ch.Close()
|
|
close(nch.ackCh)
|
|
}
|
|
nch.closed = true
|
|
}
|
|
|
|
// Send message from remote side to local receiver.
|
|
func (nch *netChan) send(val reflect.Value) {
|
|
if nch.dir != Recv {
|
|
panic("send on wrong direction of channel")
|
|
}
|
|
if nch.sendCh == nil {
|
|
// If possible, do local send directly and ack immediately.
|
|
if nch.ch.TrySend(val) {
|
|
nch.sendAck()
|
|
return
|
|
}
|
|
// Start sender goroutine to manage delayed delivery of values.
|
|
nch.sendCh = make(chan reflect.Value, nch.size)
|
|
go nch.sender()
|
|
}
|
|
select {
|
|
case nch.sendCh <- val:
|
|
// ok
|
|
default:
|
|
// TODO: should this be more resilient?
|
|
panic("netchan: remote sender sent more values than allowed")
|
|
}
|
|
}
|
|
|
|
// sendAck sends an acknowledgment that a message has left
|
|
// the channel's buffer. If the messages remaining to be sent
|
|
// will fit in the channel's buffer, then we don't
|
|
// need to send an ack.
|
|
func (nch *netChan) sendAck() {
|
|
if nch.count < 0 || nch.count > int64(nch.size) {
|
|
nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
|
|
}
|
|
if nch.count > 0 {
|
|
nch.count--
|
|
}
|
|
}
|
|
|
|
// The sender process forwards items from the sending queue
|
|
// to the destination channel, acknowledging each item.
|
|
func (nch *netChan) sender() {
|
|
if nch.dir != Recv {
|
|
panic("sender on wrong direction of channel")
|
|
}
|
|
// When Exporter.Hangup is called, the underlying channel is closed,
|
|
// and so we may get a "too many operations on closed channel" error
|
|
// if there are outstanding messages in sendCh.
|
|
// Make sure that this doesn't panic the whole program.
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// TODO check that r is "too many operations", otherwise re-panic.
|
|
}
|
|
}()
|
|
for v := range nch.sendCh {
|
|
nch.ch.Send(v)
|
|
nch.sendAck()
|
|
}
|
|
nch.ch.Close()
|
|
}
|
|
|
|
// Receive value from local side for sending to remote side.
|
|
func (nch *netChan) recv() (val reflect.Value, ok bool) {
|
|
if nch.dir != Send {
|
|
panic("recv on wrong direction of channel")
|
|
}
|
|
|
|
if nch.space == 0 {
|
|
// Wait for buffer space.
|
|
<-nch.ackCh
|
|
nch.space++
|
|
}
|
|
nch.space--
|
|
return nch.ch.Recv()
|
|
}
|
|
|
|
// acked is called when the remote side indicates that
|
|
// a value has been delivered.
|
|
func (nch *netChan) acked() {
|
|
if nch.dir != Send {
|
|
panic("recv on wrong direction of channel")
|
|
}
|
|
select {
|
|
case nch.ackCh <- true:
|
|
// ok
|
|
default:
|
|
// TODO: should this be more resilient?
|
|
panic("netchan: remote receiver sent too many acks")
|
|
}
|
|
}
|