Files
tsnet-proxy/vendor/github.com/akutz/memconn/memconn_conn.go
2024-11-01 17:43:06 +00:00

435 lines
11 KiB
Go

package memconn
import (
"net"
"sync"
"time"
)
// Conn is an in-memory implementation of Golang's "net.Conn" interface.
type Conn struct {
pipe
laddr Addr
raddr Addr
// buf contains information about the connection's buffer state if
// the connection is buffered. Otherwise this field is nil.
buf *bufConn
}
type bufConn struct {
// Please see the SetCopyOnWrite function for more information.
cow bool
// Please see the SetBufferSize function for more information.
max uint64
// cur is the amount of buffered, pending Write data
cur uint64
// cond is a condition used to wait when writing buffered data
cond sync.Cond
// mu is the mutex used by the condition. The mutex is exposed
// directly in order to access RLock and RUnlock for getting the
// buffer size.
mu sync.RWMutex
// errs is the error channel returned by the Errs() function and
// used to report erros that occur as a result of buffered write
// operations. If the pipe does not use buffered writes then this
// field will always be nil.
errs chan error
// Please see the SetCloseTimeout function for more information.
closeTimeout time.Duration
}
func makeNewConns(network string, laddr, raddr Addr) (*Conn, *Conn) {
// This code is duplicated from the Pipe() function from the file
// "memconn_pipe.go". The reason for the duplication is to optimize
// the performance by removing the need to wrap the *pipe values as
// interface{} objects out of the Pipe() function and assert them
// back as *pipe* objects in this function.
cb1 := make(chan []byte)
cb2 := make(chan []byte)
cn1 := make(chan int)
cn2 := make(chan int)
done1 := make(chan struct{})
done2 := make(chan struct{})
// Wrap the pipes with Conn to support:
//
// * The correct address information for the functions LocalAddr()
// and RemoteAddr() return the
// * Errors returns from the internal pipe are checked and
// have their internal OpError addr information replaced with
// the correct address information.
// * A channel can be setup to cause the event of the Listener
// closing closes the remoteConn immediately.
// * Buffered writes
local := &Conn{
pipe: pipe{
rdRx: cb1, rdTx: cn1,
wrTx: cb2, wrRx: cn2,
localDone: done1, remoteDone: done2,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
},
laddr: laddr,
raddr: raddr,
}
remote := &Conn{
pipe: pipe{
rdRx: cb2, rdTx: cn2,
wrTx: cb1, wrRx: cn1,
localDone: done2, remoteDone: done1,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
},
laddr: raddr,
raddr: laddr,
}
if laddr.Buffered() {
local.buf = &bufConn{
errs: make(chan error),
closeTimeout: 10 * time.Second,
}
local.buf.cond.L = &local.buf.mu
}
if raddr.Buffered() {
remote.buf = &bufConn{
errs: make(chan error),
closeTimeout: 10 * time.Second,
}
remote.buf.cond.L = &remote.buf.mu
}
return local, remote
}
// LocalBuffered returns a flag indicating whether or not the local side
// of the connection is buffered.
func (c *Conn) LocalBuffered() bool {
return c.laddr.Buffered()
}
// RemoteBuffered returns a flag indicating whether or not the remote side
// of the connection is buffered.
func (c *Conn) RemoteBuffered() bool {
return c.raddr.Buffered()
}
// BufferSize gets the number of bytes allowed to be queued for
// asynchrnous Write operations.
//
// Please note that this function will always return zero for unbuffered
// connections.
//
// Please see the function SetBufferSize for more information.
func (c *Conn) BufferSize() uint64 {
if c.laddr.Buffered() {
c.buf.mu.RLock()
defer c.buf.mu.RUnlock()
return c.buf.max
}
return 0
}
// SetBufferSize sets the number of bytes allowed to be queued for
// asynchronous Write operations. Once the amount of data pending a Write
// operation exceeds the specified size, subsequent Writes will
// block until the queued data no longer exceeds the allowed ceiling.
//
// A value of zero means no maximum is defined.
//
// If a Write operation's payload length exceeds the buffer size
// (except for zero) then the Write operation is handled synchronously.
//
// Please note that setting the buffer size has no effect on unbuffered
// connections.
func (c *Conn) SetBufferSize(i uint64) {
if c.laddr.Buffered() {
c.buf.cond.L.Lock()
defer c.buf.cond.L.Unlock()
c.buf.max = i
}
}
// CloseTimeout gets the time.Duration value used when closing buffered
// connections.
//
// Please note that this function will always return zero for
// unbuffered connections.
//
// Please see the function SetCloseTimeout for more information.
func (c *Conn) CloseTimeout() time.Duration {
if c.laddr.Buffered() {
c.buf.mu.RLock()
defer c.buf.mu.RUnlock()
return c.buf.closeTimeout
}
return 0
}
// SetCloseTimeout sets a time.Duration value used by the Close function
// to determine the amount of time to wait for pending, buffered Writes
// to complete before closing the connection.
//
// The default timeout value is 10 seconds. A zero value does not
// mean there is no timeout, rather it means the timeout is immediate.
//
// Please note that setting this value has no effect on unbuffered
// connections.
func (c *Conn) SetCloseTimeout(duration time.Duration) {
if c.laddr.Buffered() {
c.buf.cond.L.Lock()
defer c.buf.cond.L.Unlock()
c.buf.closeTimeout = duration
}
}
// CopyOnWrite gets a flag indicating whether or not copy-on-write is
// enabled for this connection.
//
// Please note that this function will always return false for
// unbuffered connections.
//
// Please see the function SetCopyOnWrite for more information.
func (c *Conn) CopyOnWrite() bool {
if c.laddr.Buffered() {
c.buf.mu.RLock()
defer c.buf.mu.RUnlock()
return c.buf.cow
}
return false
}
// SetCopyOnWrite sets a flag indicating whether or not copy-on-write
// is enabled for this connection.
//
// When a connection is buffered, data submitted to a Write operation
// is processed in a goroutine and the function returns control to the
// caller immediately. Because of this, it's possible to modify the
// data provided to the Write function before or during the actual
// Write operation. Enabling copy-on-write causes the payload to be
// copied to a new buffer before control is returned to the caller.
//
// Please note that enabling copy-on-write will double the amount of
// memory required for all Write operations.
//
// Please note that enabling copy-on-write has no effect on unbuffered
// connections.
func (c *Conn) SetCopyOnWrite(enabled bool) {
if c.laddr.Buffered() {
c.buf.cond.L.Lock()
defer c.buf.cond.L.Unlock()
c.buf.cow = enabled
}
}
// LocalAddr implements the net.Conn LocalAddr method.
func (c *Conn) LocalAddr() net.Addr {
return c.laddr
}
// RemoteAddr implements the net.Conn RemoteAddr method.
func (c *Conn) RemoteAddr() net.Addr {
return c.raddr
}
// Close implements the net.Conn Close method.
func (c *Conn) Close() error {
c.pipe.once.Do(func() {
// Buffered connections will attempt to wait until all
// pending Writes are completed, until the specified
// timeout value has elapsed, or until the remote side
// of the connection is closed.
if c.laddr.Buffered() {
c.buf.mu.RLock()
timeout := c.buf.closeTimeout
c.buf.mu.RUnlock()
// Set up a channel that is closed when the specified
// timer elapses.
timeoutDone := make(chan struct{})
if timeout == 0 {
close(timeoutDone)
} else {
time.AfterFunc(timeout, func() { close(timeoutDone) })
}
// Set up a channel that is closed when the number of
// pending bytes is zero.
writesDone := make(chan struct{})
go func() {
c.buf.cond.L.Lock()
for c.buf.cur > 0 {
c.buf.cond.Wait()
}
close(writesDone)
c.buf.cond.L.Unlock()
}()
// Wait to close the connection.
select {
case <-writesDone:
case <-timeoutDone:
case <-c.pipe.remoteDone:
}
}
close(c.pipe.localDone)
})
return nil
}
// Errs returns a channel that receives errors that may occur as the
// result of buffered write operations.
//
// This function will always return nil for unbuffered connections.
//
// Please note that the channel returned by this function is not closed
// when the connection is closed. This is because errors may continue
// to be sent over this channel as the result of asynchronous writes
// occurring after the connection is closed. Therefore this channel
// should not be used to determine when the connection is closed.
func (c *Conn) Errs() <-chan error {
return c.buf.errs
}
// Read implements the net.Conn Read method.
func (c *Conn) Read(b []byte) (int, error) {
n, err := c.pipe.Read(b)
if err != nil {
if e, ok := err.(*net.OpError); ok {
e.Addr = c.raddr
e.Source = c.laddr
return n, e
}
return n, &net.OpError{
Op: "read",
Addr: c.raddr,
Source: c.laddr,
Net: c.raddr.Network(),
Err: err,
}
}
return n, nil
}
// Write implements the net.Conn Write method.
func (c *Conn) Write(b []byte) (int, error) {
if c.laddr.Buffered() {
return c.writeAsync(b)
}
return c.writeSync(b)
}
func (c *Conn) writeSync(b []byte) (int, error) {
n, err := c.pipe.Write(b)
if err != nil {
if e, ok := err.(*net.OpError); ok {
e.Addr = c.raddr
e.Source = c.laddr
return n, e
}
return n, &net.OpError{
Op: "write",
Addr: c.raddr,
Source: c.laddr,
Net: c.raddr.Network(),
Err: err,
}
}
return n, nil
}
// writeAsync performs the Write operation in a goroutine. This
// behavior means the Write operation is not blocking, but also means
// that when Write operations fail the associated error is not returned
// from this function.
func (c *Conn) writeAsync(b []byte) (int, error) {
// Perform a synchronous Write if the connection has a non-zero
// value for the maximum allowed buffer size and if the size of
// the payload exceeds that maximum value.
if c.buf.max > 0 && uint64(len(b)) > c.buf.max {
return c.writeSync(b)
}
// Block the operation from proceeding until there is available
// buffer space.
c.buf.cond.L.Lock()
for c.buf.max > 0 && uint64(len(b))+c.buf.cur > c.buf.max {
c.buf.cond.Wait()
}
// Copy the buffer if the connection uses copy-on-write.
cb := b
if c.buf.cow {
cb = make([]byte, len(b))
copy(cb, b)
}
// Update the amount of active data being written.
c.buf.cur = c.buf.cur + uint64(len(cb))
c.buf.cond.L.Unlock()
go func() {
if _, err := c.writeSync(cb); err != nil {
go func() { c.buf.errs <- err }()
}
// Decrement the enqueued buffer size and signal a blocked
// goroutine that it may proceed
c.buf.cond.L.Lock()
c.buf.cur = c.buf.cur - uint64(len(cb))
c.buf.cond.L.Unlock()
c.buf.cond.Signal()
}()
return len(cb), nil
}
// SetReadDeadline implements the net.Conn SetReadDeadline method.
func (c *Conn) SetReadDeadline(t time.Time) error {
if err := c.pipe.SetReadDeadline(t); err != nil {
if e, ok := err.(*net.OpError); ok {
e.Addr = c.laddr
e.Source = c.laddr
return e
}
return &net.OpError{
Op: "setReadDeadline",
Addr: c.laddr,
Source: c.laddr,
Net: c.laddr.Network(),
Err: err,
}
}
return nil
}
// SetWriteDeadline implements the net.Conn SetWriteDeadline method.
func (c *Conn) SetWriteDeadline(t time.Time) error {
if err := c.pipe.SetWriteDeadline(t); err != nil {
if e, ok := err.(*net.OpError); ok {
e.Addr = c.laddr
e.Source = c.laddr
return e
}
return &net.OpError{
Op: "setWriteDeadline",
Addr: c.laddr,
Source: c.laddr,
Net: c.laddr.Network(),
Err: err,
}
}
return nil
}