more tweaks to throttler & commit vars.

This commit is contained in:
Sugu Sougoumarane 2013-10-11 10:22:18 -07:00
Родитель 2504714061
Коммит d173b90335
3 изменённых файлов: 18 добавлений и 15 удалений

Просмотреть файл

@ -22,9 +22,8 @@ func NewThrottledListener(l net.Listener, maxRate int64) net.Listener {
return &ThrottledListener{l, time.Duration(1e9 / maxRate)}
}
// Accept accepts a new connection only if the accept rate
// will not exceed the throttling limit. Otherwise, it waits
// before accepting.
// Accept accepts a new connection, but ensures that te
// rate does not exceed the specified maxRate.
func (tln *ThrottledListener) Accept() (c net.Conn, err error) {
// We assume Accept is called in a tight loop.
// So we can just sleep for minDelay

Просмотреть файл

@ -43,20 +43,22 @@ const (
)
type ActiveTxPool struct {
pool *pools.Numbered
lastId sync2.AtomicInt64
timeout sync2.AtomicDuration
ticks *timer.Timer
txStats *stats.Timings
pool *pools.Numbered
lastId sync2.AtomicInt64
timeout sync2.AtomicDuration
ticks *timer.Timer
txStats *stats.Timings
completionStats *stats.Timings
}
func NewActiveTxPool(name string, timeout time.Duration) *ActiveTxPool {
axp := &ActiveTxPool{
pool: pools.NewNumbered(),
lastId: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
ticks: timer.NewTimer(timeout / 10),
txStats: stats.NewTimings("Transactions"),
pool: pools.NewNumbered(),
lastId: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
ticks: timer.NewTimer(timeout / 10),
txStats: stats.NewTimings("Transactions"),
completionStats: stats.NewTimings("TransactionCompletion"),
}
stats.Publish(name+"Size", stats.IntFunc(axp.pool.Size))
stats.Publish(
@ -109,7 +111,7 @@ func (axp *ActiveTxPool) SafeCommit(transactionId int64) (invalidList map[string
conn := axp.Get(transactionId)
defer conn.discard(TX_COMMIT)
axp.txStats.Add("Completed", time.Now().Sub(conn.startTime))
defer axp.txStats.Record("Commit", time.Now())
defer axp.completionStats.Record("Commit", time.Now())
if _, err = conn.ExecuteFetch(COMMIT, 1, false); err != nil {
conn.Close()
}
@ -120,6 +122,7 @@ func (axp *ActiveTxPool) Rollback(transactionId int64) {
conn := axp.Get(transactionId)
defer conn.discard(TX_ROLLBACK)
axp.txStats.Add("Aborted", time.Now().Sub(conn.startTime))
defer axp.completionStats.Record("Rollback", time.Now())
if _, err := conn.ExecuteFetch(ROLLBACK, 1, false); err != nil {
conn.Close()
panic(NewTabletErrorSql(FAIL, err))

Просмотреть файл

@ -61,7 +61,7 @@ class TestNocache(framework.TestCase):
# We should have at least one connection
self.assertEqual(vstart.mget("Transactions.TotalCount", 0)+2, vend.Transactions.TotalCount)
self.assertEqual(vstart.mget("Transactions.Histograms.Completed.Count", 0)+2, vend.Transactions.Histograms.Completed.Count)
self.assertEqual(vstart.mget("Transactions.Histograms.Commit.Count", 0)+2, vend.Transactions.Histograms.Commit.Count)
self.assertEqual(vstart.mget("TransactionCompletion.Histograms.Commit.Count", 0)+2, vend.TransactionCompletion.Histograms.Commit.Count)
self.assertEqual(vstart.mget("Queries.TotalCount", 0)+4, vend.Queries.TotalCount)
self.assertEqual(vstart.mget("Queries.Histograms.INSERT_PK.Count", 0)+1, vend.Queries.Histograms.INSERT_PK.Count)
self.assertEqual(vstart.mget("Queries.Histograms.DML_PK.Count", 0)+1, vend.Queries.Histograms.DML_PK.Count)
@ -97,6 +97,7 @@ class TestNocache(framework.TestCase):
vend = self.env.debug_vars()
self.assertEqual(vstart.mget("Transactions.TotalCount", 0)+1, vend.Transactions.TotalCount)
self.assertEqual(vstart.mget("Transactions.Histograms.Aborted.Count", 0)+1, vend.Transactions.Histograms.Aborted.Count)
self.assertEqual(vstart.mget("TransactionCompletion.Histograms.Rollback.Count", 0)+1, vend.TransactionCompletion.Histograms.Rollback.Count)
def test_nontx_dml(self):
vstart = self.env.debug_vars()