Created
October 25, 2019 23:23
-
-
Save caelifer/70864cb13e9c12842059daba7a144ca3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package disruptor | |
import ( | |
"encoding/binary" | |
"log" | |
"runtime" | |
"sync/atomic" | |
"time" | |
) | |
const ( | |
RingBufferSize = 8 * 64 // 8 64-byte cache lines | |
SizeOfInt = 8 | |
) | |
type Offset = uint64 | |
type ring struct { | |
// Make sure 64-byte alignment | |
buf [RingBufferSize]byte | |
dirty Offset | |
clean Offset | |
datumSz Offset // max 264 bytes (4*64) | |
retries int64 | |
} | |
func New(datumSize Offset) *ring { | |
return &ring{datumSz: datumSize} | |
} | |
// commit should be inlined by the compiler | |
func (r *ring) commit(oldDirty, newOffset Offset) bool { | |
clean := atomic.LoadUint64(&r.clean) | |
return atomic.CompareAndSwapUint64(&r.dirty, oldDirty, newOffset) && | |
atomic.CompareAndSwapUint64(&r.clean, clean, newOffset) | |
} | |
func (r *ring) write(data []byte) int { | |
sz := Offset(len(data)) | |
if sz > r.datumSz { | |
panic("unsupported data chunk size") | |
} | |
// Try until we succeed | |
for { | |
// Load stored area | |
dirty := atomic.LoadUint64(&r.dirty) | |
// Calculate new offset | |
off := (dirty + sz) % r.datumSz | |
// Copy data to the new slot | |
_ = copy(r.buf[off:], data[:r.datumSz]) | |
// Break if we are able to commit | |
if r.commit(dirty, off) { | |
break | |
} | |
atomic.AddInt64(&r.retries, 1) | |
} | |
return int(sz) | |
} | |
// Read returns data chunk available to read. It may return zeroed memory if there is no available | |
// data in the buffer. | |
func (r *ring) Read() []byte { | |
return r.buf[atomic.LoadUint64(&r.clean):r.datumSz] | |
} | |
func TestDisruptor(samples int, wait *int64) *ring { | |
data := make([]byte, SizeOfInt) | |
r := New(SizeOfInt) // | |
go func(t0 time.Time) { | |
for i := 0; i < samples; i++ { | |
binary.LittleEndian.PutUint64(data, uint64(i)) | |
r.write(data) | |
} | |
elapsed := time.Since(t0) | |
log.Printf("writer: finished writing %dM samples in %v [%.2f samples/s] (with %d contentions)", | |
samples/1000000, elapsed, float64(samples)/elapsed.Seconds(), atomic.LoadInt64(&r.retries)) | |
atomic.AddInt64(wait, -1) | |
}(time.Now()) | |
return r | |
} | |
func WriteN(n int, wait *int64) *ring { | |
data := make([]byte, SizeOfInt) | |
r := New(SizeOfInt) // | |
go func() { | |
for i := 10; i < 10+n; i++ { | |
binary.LittleEndian.PutUint64(data, uint64(i)) | |
r.write(data) | |
runtime.Gosched() | |
} | |
atomic.AddInt64(wait, -1) | |
}() | |
return r | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
work in progress