You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

662 lines
19 KiB

  1. // Copyright 2015 Google Inc. All rights reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package trafficshape
  15. import (
  16. "io"
  17. "net"
  18. "sort"
  19. "sync"
  20. "time"
  21. "github.com/google/martian/log"
  22. )
  23. // ErrForceClose is an error that communicates the need to close the connection.
  24. type ErrForceClose struct {
  25. message string
  26. }
  27. func (efc *ErrForceClose) Error() string {
  28. return efc.message
  29. }
  30. // DefaultBitrate represents the bitrate that will be for all url regexs for which a shape
  31. // has not been specified.
  32. var DefaultBitrate int64 = 500000000000 // 500Gbps (unlimited)
  33. // urlShape contains a rw lock protected shape of a url_regex.
  34. type urlShape struct {
  35. sync.RWMutex
  36. Shape *Shape
  37. }
  38. // urlShapes contains a rw lock protected map of url regexs to their URLShapes.
  39. type urlShapes struct {
  40. sync.RWMutex
  41. M map[string]*urlShape
  42. LastModifiedTime time.Time
  43. }
  44. // Buckets contains the read and write buckets for a url_regex.
  45. type Buckets struct {
  46. ReadBucket *Bucket
  47. WriteBucket *Bucket
  48. }
  49. // NewBuckets returns a *Buckets with the specified up and down bandwidths.
  50. func NewBuckets(up int64, down int64) *Buckets {
  51. return &Buckets{
  52. ReadBucket: NewBucket(up, time.Second),
  53. WriteBucket: NewBucket(down, time.Second),
  54. }
  55. }
  56. // ThrottleContext represents whether we are currently in a throttle interval for a particular
  57. // url_regex. If ThrottleNow is true, only then will the current throttle 'Bandwidth' be set
  58. // correctly.
  59. type ThrottleContext struct {
  60. ThrottleNow bool
  61. Bandwidth int64
  62. }
  63. // NextActionInfo represents whether there is an upcoming action. Only if ActionNext is true will the
  64. // Index and ByteOffset be set correctly.
  65. type NextActionInfo struct {
  66. ActionNext bool
  67. Index int64
  68. ByteOffset int64
  69. }
  70. // Context represents the current information that is needed while writing back to the client.
  71. // Only if Shaping is true, that is we are currently writing back a response that matches a certain
  72. // url_regex will the other values be set correctly. If so, the Buckets represent the buckets
  73. // to be used for the current url_regex. NextActionInfo tells us whether there is an upcoming action
  74. // that needs to be performed, and ThrottleContext tells us whether we are currently in a throttle
  75. // interval (according to the RangeStart). Note, the ThrottleContext is only used once in the start
  76. // to determine the beginning bandwidth. It need not be updated after that. This
  77. // is because the subsequent throttles are captured in the upcoming ChangeBandwidth actions.
  78. // Byte Offset represents the absolute byte offset of response data that we are currently writing back.
  79. // It does not account for the header data.
  80. type Context struct {
  81. Shaping bool
  82. RangeStart int64
  83. URLRegex string
  84. Buckets *Buckets
  85. GlobalBucket *Bucket
  86. ThrottleContext *ThrottleContext
  87. NextActionInfo *NextActionInfo
  88. ByteOffset int64
  89. HeaderLen int64
  90. HeaderBytesWritten int64
  91. }
  92. // Listener wraps a net.Listener and simulates connection latency and bandwidth
  93. // constraints.
  94. type Listener struct {
  95. net.Listener
  96. ReadBucket *Bucket
  97. WriteBucket *Bucket
  98. mu sync.RWMutex
  99. latency time.Duration
  100. GlobalBuckets map[string]*Bucket
  101. Shapes *urlShapes
  102. defaults *Default
  103. }
  104. // Conn wraps a net.Conn and simulates connection latency and bandwidth
  105. // constraints. Shapes represents the traffic shape map inherited from the listener.
  106. // Established is the time that this connection was established. LocalBuckets represents a map from
  107. // the url_regexes to their dedicated buckets.
  108. type Conn struct {
  109. net.Conn
  110. ReadBucket *Bucket // Shared by listener.
  111. WriteBucket *Bucket // Shared by listener.
  112. latency time.Duration
  113. ronce sync.Once
  114. wonce sync.Once
  115. Shapes *urlShapes
  116. GlobalBuckets map[string]*Bucket
  117. LocalBuckets map[string]*Buckets
  118. Established time.Time
  119. Context *Context
  120. DefaultBandwidth Bandwidth
  121. Listener *Listener
  122. }
  123. // NewListener returns a new bandwidth constrained listener. Defaults to
  124. // DefaultBitrate (uncapped).
  125. func NewListener(l net.Listener) *Listener {
  126. return &Listener{
  127. Listener: l,
  128. ReadBucket: NewBucket(DefaultBitrate/8, time.Second),
  129. WriteBucket: NewBucket(DefaultBitrate/8, time.Second),
  130. Shapes: &urlShapes{M: make(map[string]*urlShape)},
  131. GlobalBuckets: make(map[string]*Bucket),
  132. defaults: &Default{
  133. Bandwidth: Bandwidth{
  134. Up: DefaultBitrate / 8,
  135. Down: DefaultBitrate / 8,
  136. },
  137. Latency: 0,
  138. },
  139. }
  140. }
  141. // ReadBitrate returns the bitrate in bits per second for reads.
  142. func (l *Listener) ReadBitrate() int64 {
  143. return l.ReadBucket.Capacity() * 8
  144. }
  145. // SetReadBitrate sets the bitrate in bits per second for reads.
  146. func (l *Listener) SetReadBitrate(bitrate int64) {
  147. l.ReadBucket.SetCapacity(bitrate / 8)
  148. }
  149. // WriteBitrate returns the bitrate in bits per second for writes.
  150. func (l *Listener) WriteBitrate() int64 {
  151. return l.WriteBucket.Capacity() * 8
  152. }
  153. // SetWriteBitrate sets the bitrate in bits per second for writes.
  154. func (l *Listener) SetWriteBitrate(bitrate int64) {
  155. l.WriteBucket.SetCapacity(bitrate / 8)
  156. }
  157. // SetDefaults sets the default traffic shaping parameters for the listener.
  158. func (l *Listener) SetDefaults(defaults *Default) {
  159. l.mu.Lock()
  160. defer l.mu.Unlock()
  161. l.defaults = defaults
  162. }
  163. // Defaults returns the default traffic shaping parameters for the listener.
  164. func (l *Listener) Defaults() *Default {
  165. l.mu.RLock()
  166. defer l.mu.RUnlock()
  167. return l.defaults
  168. }
  169. // Latency returns the latency for connections.
  170. func (l *Listener) Latency() time.Duration {
  171. l.mu.Lock()
  172. defer l.mu.Unlock()
  173. return l.latency
  174. }
  175. // SetLatency sets the initial latency for connections.
  176. func (l *Listener) SetLatency(latency time.Duration) {
  177. l.mu.Lock()
  178. defer l.mu.Unlock()
  179. l.latency = latency
  180. }
  181. // GetTrafficShapedConn takes in a normal connection and returns a traffic shaped connection.
  182. func (l *Listener) GetTrafficShapedConn(oc net.Conn) *Conn {
  183. if tsconn, ok := oc.(*Conn); ok {
  184. return tsconn
  185. }
  186. urlbuckets := make(map[string]*Buckets)
  187. globalurlbuckets := make(map[string]*Bucket)
  188. l.Shapes.RLock()
  189. defaults := l.Defaults()
  190. latency := l.Latency()
  191. defaultBandwidth := defaults.Bandwidth
  192. for regex, shape := range l.Shapes.M {
  193. // It should be ok to not acquire the read lock on shape, since WriteBucket is never mutated.
  194. globalurlbuckets[regex] = shape.Shape.WriteBucket
  195. urlbuckets[regex] = NewBuckets(DefaultBitrate/8, shape.Shape.MaxBandwidth)
  196. }
  197. l.Shapes.RUnlock()
  198. curinfo := &Context{}
  199. lc := &Conn{
  200. Conn: oc,
  201. latency: latency,
  202. ReadBucket: l.ReadBucket,
  203. WriteBucket: l.WriteBucket,
  204. Shapes: l.Shapes,
  205. GlobalBuckets: globalurlbuckets,
  206. LocalBuckets: urlbuckets,
  207. Context: curinfo,
  208. Established: time.Now(),
  209. DefaultBandwidth: defaultBandwidth,
  210. Listener: l,
  211. }
  212. return lc
  213. }
  214. // Accept waits for and returns the next connection to the listener.
  215. func (l *Listener) Accept() (net.Conn, error) {
  216. oc, err := l.Listener.Accept()
  217. if err != nil {
  218. log.Errorf("trafficshape: failed accepting connection: %v", err)
  219. return nil, err
  220. }
  221. if tconn, ok := oc.(*net.TCPConn); ok {
  222. log.Debugf("trafficshape: setting keep-alive for TCP connection")
  223. tconn.SetKeepAlive(true)
  224. tconn.SetKeepAlivePeriod(3 * time.Minute)
  225. }
  226. return l.GetTrafficShapedConn(oc), nil
  227. }
  228. // Close closes the read and write buckets along with the underlying listener.
  229. func (l *Listener) Close() error {
  230. defer log.Debugf("trafficshape: closed read/write buckets and connection")
  231. l.ReadBucket.Close()
  232. l.WriteBucket.Close()
  233. return l.Listener.Close()
  234. }
  235. // Read reads bytes from connection into b, optionally simulating connection
  236. // latency and throttling read throughput based on desired bandwidth
  237. // constraints.
  238. func (c *Conn) Read(b []byte) (int, error) {
  239. c.ronce.Do(c.sleepLatency)
  240. n, err := c.ReadBucket.FillThrottle(func(remaining int64) (int64, error) {
  241. max := remaining
  242. if l := int64(len(b)); max > l {
  243. max = l
  244. }
  245. n, err := c.Conn.Read(b[:max])
  246. return int64(n), err
  247. })
  248. if err != nil && err != io.EOF {
  249. log.Errorf("trafficshape: error on throttled read: %v", err)
  250. }
  251. return int(n), err
  252. }
  253. // ReadFrom reads data from r until EOF or error, optionally simulating
  254. // connection latency and throttling read throughput based on desired bandwidth
  255. // constraints.
  256. func (c *Conn) ReadFrom(r io.Reader) (int64, error) {
  257. c.ronce.Do(c.sleepLatency)
  258. var total int64
  259. for {
  260. n, err := c.ReadBucket.FillThrottle(func(remaining int64) (int64, error) {
  261. return io.CopyN(c.Conn, r, remaining)
  262. })
  263. total += n
  264. if err == io.EOF {
  265. log.Debugf("trafficshape: exhausted reader successfully")
  266. return total, nil
  267. } else if err != nil {
  268. log.Errorf("trafficshape: failed copying from reader: %v", err)
  269. return total, err
  270. }
  271. }
  272. }
  273. // WriteTo writes data to w from the connection, optionally simulating
  274. // connection latency and throttling write throughput based on desired
  275. // bandwidth constraints.
  276. func (c *Conn) WriteTo(w io.Writer) (int64, error) {
  277. c.wonce.Do(c.sleepLatency)
  278. var total int64
  279. for {
  280. n, err := c.WriteBucket.FillThrottle(func(remaining int64) (int64, error) {
  281. return io.CopyN(w, c.Conn, remaining)
  282. })
  283. total += n
  284. if err != nil {
  285. if err != io.EOF {
  286. log.Errorf("trafficshape: failed copying to writer: %v", err)
  287. }
  288. return total, err
  289. }
  290. }
  291. }
  292. func min(x, y int64) int64 {
  293. if x < y {
  294. return x
  295. }
  296. return y
  297. }
  298. // CheckExistenceAndValidity checks that the current url regex is present in the map, and that
  299. // the connection was established before the url shape map was last updated. We do not allow the
  300. // updated url shape map to traffic shape older connections.
  301. // Important: Assumes you have acquired the required locks and will release them youself.
  302. func (c *Conn) CheckExistenceAndValidity(URLRegex string) bool {
  303. shapeStillValid := c.Shapes.LastModifiedTime.Before(c.Established)
  304. _, p := c.Shapes.M[URLRegex]
  305. return p && shapeStillValid
  306. }
  307. // GetCurrentThrottle uses binary search to determine if the current byte offset ('start')
  308. // lies within a throttle interval. If so, also returns the bandwidth specified for that interval.
  309. func (c *Conn) GetCurrentThrottle(start int64) *ThrottleContext {
  310. c.Shapes.RLock()
  311. defer c.Shapes.RUnlock()
  312. if !c.CheckExistenceAndValidity(c.Context.URLRegex) {
  313. log.Debugf("existence check failed")
  314. return &ThrottleContext{
  315. ThrottleNow: false,
  316. }
  317. }
  318. c.Shapes.M[c.Context.URLRegex].RLock()
  319. defer c.Shapes.M[c.Context.URLRegex].RUnlock()
  320. throttles := c.Shapes.M[c.Context.URLRegex].Shape.Throttles
  321. if l := len(throttles); l != 0 {
  322. // ind is the first index in throttles with ByteStart > start.
  323. // Once we get ind, we can check the previous throttle, if any,
  324. // to see if its ByteEnd is after 'start'.
  325. ind := sort.Search(len(throttles),
  326. func(i int) bool { return throttles[i].ByteStart > start })
  327. // All throttles have Bytestart > start, hence not in throttle.
  328. if ind == 0 {
  329. return &ThrottleContext{
  330. ThrottleNow: false,
  331. }
  332. }
  333. // No throttle has Bytestart > start, so check the last throttle to
  334. // see if it ends after 'start'. Note: the last throttle is special
  335. // since it can have -1 (meaning infinity) as the ByteEnd.
  336. if ind == l {
  337. if throttles[l-1].ByteEnd > start || throttles[l-1].ByteEnd == -1 {
  338. return &ThrottleContext{
  339. ThrottleNow: true,
  340. Bandwidth: throttles[l-1].Bandwidth,
  341. }
  342. }
  343. return &ThrottleContext{
  344. ThrottleNow: false,
  345. }
  346. }
  347. // Check the previous throttle to see if it ends after 'start'.
  348. if throttles[ind-1].ByteEnd > start {
  349. return &ThrottleContext{
  350. ThrottleNow: true,
  351. Bandwidth: throttles[ind-1].Bandwidth,
  352. }
  353. }
  354. return &ThrottleContext{
  355. ThrottleNow: false,
  356. }
  357. }
  358. return &ThrottleContext{
  359. ThrottleNow: false,
  360. }
  361. }
  362. // GetNextActionFromByte takes in a byte offset and uses binary search to determine the upcoming
  363. // action, i.e the first action after the byte that still has a non zero count.
  364. func (c *Conn) GetNextActionFromByte(start int64) *NextActionInfo {
  365. c.Shapes.RLock()
  366. defer c.Shapes.RUnlock()
  367. if !c.CheckExistenceAndValidity(c.Context.URLRegex) {
  368. log.Debugf("existence check failed")
  369. return &NextActionInfo{
  370. ActionNext: false,
  371. }
  372. }
  373. c.Shapes.M[c.Context.URLRegex].RLock()
  374. defer c.Shapes.M[c.Context.URLRegex].RUnlock()
  375. actions := c.Shapes.M[c.Context.URLRegex].Shape.Actions
  376. if l := len(actions); l != 0 {
  377. ind := sort.Search(len(actions),
  378. func(i int) bool { return actions[i].getByte() >= start })
  379. return c.GetNextActionFromIndex(int64(ind))
  380. }
  381. return &NextActionInfo{
  382. ActionNext: false,
  383. }
  384. }
  385. // GetNextActionFromIndex takes in an index and returns the first action after the index that
  386. // has a non zero count, if there is one.
  387. func (c *Conn) GetNextActionFromIndex(ind int64) *NextActionInfo {
  388. c.Shapes.RLock()
  389. defer c.Shapes.RUnlock()
  390. if !c.CheckExistenceAndValidity(c.Context.URLRegex) {
  391. return &NextActionInfo{
  392. ActionNext: false,
  393. }
  394. }
  395. c.Shapes.M[c.Context.URLRegex].RLock()
  396. defer c.Shapes.M[c.Context.URLRegex].RUnlock()
  397. actions := c.Shapes.M[c.Context.URLRegex].Shape.Actions
  398. if l := int64(len(actions)); l != 0 {
  399. for ind < l && (actions[ind].getCount() == 0) {
  400. ind++
  401. }
  402. if ind >= l {
  403. return &NextActionInfo{
  404. ActionNext: false,
  405. }
  406. }
  407. return &NextActionInfo{
  408. ActionNext: true,
  409. Index: ind,
  410. ByteOffset: actions[ind].getByte(),
  411. }
  412. }
  413. return &NextActionInfo{
  414. ActionNext: false,
  415. }
  416. }
  417. // WriteDefaultBuckets writes bytes from b to the connection, optionally simulating
  418. // connection latency and throttling write throughput based on desired
  419. // bandwidth constraints. It uses the WriteBucket inherited from the listener.
  420. func (c *Conn) WriteDefaultBuckets(b []byte) (int, error) {
  421. c.wonce.Do(c.sleepLatency)
  422. var total int64
  423. for len(b) > 0 {
  424. var max int64
  425. n, err := c.WriteBucket.FillThrottle(func(remaining int64) (int64, error) {
  426. max = remaining
  427. if l := int64(len(b)); remaining >= l {
  428. max = l
  429. }
  430. n, err := c.Conn.Write(b[:max])
  431. return int64(n), err
  432. })
  433. total += n
  434. if err != nil {
  435. if err != io.EOF {
  436. log.Errorf("trafficshape: failed write: %v", err)
  437. }
  438. return int(total), err
  439. }
  440. b = b[max:]
  441. }
  442. return int(total), nil
  443. }
  444. // Write writes bytes from b to the connection, while enforcing throttles and performing actions.
  445. // It uses and updates the Context in the connection.
  446. func (c *Conn) Write(b []byte) (int, error) {
  447. if !c.Context.Shaping {
  448. return c.WriteDefaultBuckets(b)
  449. }
  450. c.wonce.Do(c.sleepLatency)
  451. var total int64
  452. // Write the header if needed, without enforcing any traffic shaping, and without updating
  453. // ByteOffset.
  454. if headerToWrite := c.Context.HeaderLen - c.Context.HeaderBytesWritten; headerToWrite > 0 {
  455. writeAmount := min(int64(len(b)), headerToWrite)
  456. n, err := c.Conn.Write(b[:writeAmount])
  457. if err != nil {
  458. if err != io.EOF {
  459. log.Errorf("trafficshape: failed write: %v", err)
  460. }
  461. return int(n), err
  462. }
  463. c.Context.HeaderBytesWritten += writeAmount
  464. total += writeAmount
  465. b = b[writeAmount:]
  466. }
  467. var amountToWrite int64
  468. for len(b) > 0 {
  469. var max int64
  470. // Determine the amount to be written up till the next action.
  471. amountToWrite = int64(len(b))
  472. if c.Context.NextActionInfo.ActionNext {
  473. amountTillNextAction := c.Context.NextActionInfo.ByteOffset - c.Context.ByteOffset
  474. if amountTillNextAction <= amountToWrite {
  475. amountToWrite = amountTillNextAction
  476. }
  477. }
  478. // Write into both the local and global buckets, as well as the underlying connection.
  479. n, err := c.Context.Buckets.WriteBucket.FillThrottleLocked(func(remaining int64) (int64, error) {
  480. max = min(remaining, amountToWrite)
  481. if max == 0 {
  482. return 0, nil
  483. }
  484. return c.Context.GlobalBucket.FillThrottleLocked(func(rem int64) (int64, error) {
  485. max = min(rem, max)
  486. n, err := c.Conn.Write(b[:max])
  487. return int64(n), err
  488. })
  489. })
  490. if err != nil {
  491. if err != io.EOF {
  492. log.Errorf("trafficshape: failed write: %v", err)
  493. }
  494. return int(total), err
  495. }
  496. // Update the current byte offset.
  497. c.Context.ByteOffset += n
  498. total += n
  499. b = b[max:]
  500. // Check if there was an upcoming action, and that the byte offset matches the action's byte.
  501. if c.Context.NextActionInfo.ActionNext &&
  502. c.Context.ByteOffset >= c.Context.NextActionInfo.ByteOffset {
  503. // Note here, we check again that the url shape map is still valid and that the action still has
  504. // a non zero count, since that could have been modified since the last time we checked.
  505. ind := c.Context.NextActionInfo.Index
  506. c.Shapes.RLock()
  507. if !c.CheckExistenceAndValidity(c.Context.URLRegex) {
  508. c.Shapes.RUnlock()
  509. // Write the remaining b using default buckets, and set Shaping as false
  510. // so that subsequent calls to Write() also use default buckets
  511. // without performing any actions.
  512. c.Context.Shaping = false
  513. writeTotal, e := c.WriteDefaultBuckets(b)
  514. return int(total) + writeTotal, e
  515. }
  516. c.Shapes.M[c.Context.URLRegex].Lock()
  517. actions := c.Shapes.M[c.Context.URLRegex].Shape.Actions
  518. if actions[ind].getCount() != 0 {
  519. // Update the action count, determine the type of action and perform it.
  520. actions[ind].decrementCount()
  521. switch action := actions[ind].(type) {
  522. case *Halt:
  523. d := action.Duration
  524. log.Debugf("trafficshape: Sleeping for time %d ms for urlregex %s at byte offset %d",
  525. d, c.Context.URLRegex, c.Context.ByteOffset)
  526. c.Shapes.M[c.Context.URLRegex].Unlock()
  527. c.Shapes.RUnlock()
  528. time.Sleep(time.Duration(d) * time.Millisecond)
  529. case *CloseConnection:
  530. log.Infof("trafficshape: Closing connection for urlregex %s at byte offset %d",
  531. c.Context.URLRegex, c.Context.ByteOffset)
  532. c.Shapes.M[c.Context.URLRegex].Unlock()
  533. c.Shapes.RUnlock()
  534. return int(total), &ErrForceClose{message: "Forcing close connection"}
  535. case *ChangeBandwidth:
  536. bw := action.Bandwidth
  537. log.Infof("trafficshape: Changing connection bandwidth to %d for urlregex %s at byte offset %d",
  538. bw, c.Context.URLRegex, c.Context.ByteOffset)
  539. c.Shapes.M[c.Context.URLRegex].Unlock()
  540. c.Shapes.RUnlock()
  541. c.Context.Buckets.WriteBucket.SetCapacity(bw)
  542. default:
  543. c.Shapes.M[c.Context.URLRegex].Unlock()
  544. c.Shapes.RUnlock()
  545. }
  546. } else {
  547. c.Shapes.M[c.Context.URLRegex].Unlock()
  548. c.Shapes.RUnlock()
  549. }
  550. // Get the next action to be performed, if any.
  551. c.Context.NextActionInfo = c.GetNextActionFromIndex(ind + 1)
  552. }
  553. }
  554. return int(total), nil
  555. }
  556. func (c *Conn) sleepLatency() {
  557. log.Debugf("trafficshape: simulating latency: %s", c.latency)
  558. time.Sleep(c.latency)
  559. }