25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1085 lines
33 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "bytes"
  16. "container/heap"
  17. "context"
  18. "fmt"
  19. "math/rand"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "cloud.google.com/go/spanner/internal/testutil"
  25. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/status"
  29. )
  30. // TestSessionPoolConfigValidation tests session pool config validation.
  31. func TestSessionPoolConfigValidation(t *testing.T) {
  32. t.Parallel()
  33. sc := testutil.NewMockCloudSpannerClient(t)
  34. for _, test := range []struct {
  35. spc SessionPoolConfig
  36. err error
  37. }{
  38. {
  39. SessionPoolConfig{},
  40. errNoRPCGetter(),
  41. },
  42. {
  43. SessionPoolConfig{
  44. getRPCClient: func() (sppb.SpannerClient, error) {
  45. return sc, nil
  46. },
  47. MinOpened: 10,
  48. MaxOpened: 5,
  49. },
  50. errMinOpenedGTMaxOpened(5, 10),
  51. },
  52. } {
  53. if _, err := newSessionPool("mockdb", test.spc, nil); !testEqual(err, test.err) {
  54. t.Fatalf("want %v, got %v", test.err, err)
  55. }
  56. }
  57. }
  58. // TestSessionCreation tests session creation during sessionPool.Take().
  59. func TestSessionCreation(t *testing.T) {
  60. t.Parallel()
  61. ctx := context.Background()
  62. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  63. defer cleanup()
  64. // Take three sessions from session pool, this should trigger session pool
  65. // to create three new sessions.
  66. shs := make([]*sessionHandle, 3)
  67. // gotDs holds the unique sessions taken from session pool.
  68. gotDs := map[string]bool{}
  69. for i := 0; i < len(shs); i++ {
  70. var err error
  71. shs[i], err = sp.take(ctx)
  72. if err != nil {
  73. t.Fatalf("failed to get session(%v): %v", i, err)
  74. }
  75. gotDs[shs[i].getID()] = true
  76. }
  77. if len(gotDs) != len(shs) {
  78. t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs))
  79. }
  80. if wantDs := mock.DumpSessions(); !testEqual(gotDs, wantDs) {
  81. t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs)
  82. }
  83. // Verify that created sessions are recorded correctly in session pool.
  84. sp.mu.Lock()
  85. if int(sp.numOpened) != len(shs) {
  86. t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs))
  87. }
  88. if sp.createReqs != 0 {
  89. t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
  90. }
  91. sp.mu.Unlock()
  92. // Verify that created sessions are tracked correctly by healthcheck queue.
  93. hc := sp.hc
  94. hc.mu.Lock()
  95. if hc.queue.Len() != len(shs) {
  96. t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs))
  97. }
  98. for _, s := range hc.queue.sessions {
  99. if !gotDs[s.getID()] {
  100. t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID())
  101. }
  102. }
  103. hc.mu.Unlock()
  104. }
  105. // TestTakeFromIdleList tests taking sessions from session pool's idle list.
  106. func TestTakeFromIdleList(t *testing.T) {
  107. t.Parallel()
  108. ctx := context.Background()
  109. // Make sure maintainer keeps the idle sessions.
  110. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxIdle: 10})
  111. defer cleanup()
  112. // Take ten sessions from session pool and recycle them.
  113. shs := make([]*sessionHandle, 10)
  114. for i := 0; i < len(shs); i++ {
  115. var err error
  116. shs[i], err = sp.take(ctx)
  117. if err != nil {
  118. t.Fatalf("failed to get session(%v): %v", i, err)
  119. }
  120. }
  121. // Make sure it's sampled once before recycling, otherwise it will be
  122. // cleaned up.
  123. <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
  124. for i := 0; i < len(shs); i++ {
  125. shs[i].recycle()
  126. }
  127. // Further session requests from session pool won't cause mockclient to
  128. // create more sessions.
  129. wantSessions := mock.DumpSessions()
  130. // Take ten sessions from session pool again, this time all sessions should
  131. // come from idle list.
  132. gotSessions := map[string]bool{}
  133. for i := 0; i < len(shs); i++ {
  134. sh, err := sp.take(ctx)
  135. if err != nil {
  136. t.Fatalf("cannot take session from session pool: %v", err)
  137. }
  138. gotSessions[sh.getID()] = true
  139. }
  140. if len(gotSessions) != 10 {
  141. t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
  142. }
  143. if !testEqual(gotSessions, wantSessions) {
  144. t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
  145. }
  146. }
  147. // TesttakeWriteSessionFromIdleList tests taking write sessions from session
  148. // pool's idle list.
  149. func TestTakeWriteSessionFromIdleList(t *testing.T) {
  150. t.Parallel()
  151. ctx := context.Background()
  152. // Make sure maintainer keeps the idle sessions.
  153. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxIdle: 20})
  154. defer cleanup()
  155. // Take ten sessions from session pool and recycle them.
  156. shs := make([]*sessionHandle, 10)
  157. for i := 0; i < len(shs); i++ {
  158. var err error
  159. shs[i], err = sp.takeWriteSession(ctx)
  160. if err != nil {
  161. t.Fatalf("failed to get session(%v): %v", i, err)
  162. }
  163. }
  164. // Make sure it's sampled once before recycling, otherwise it will be
  165. // cleaned up.
  166. <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
  167. for i := 0; i < len(shs); i++ {
  168. shs[i].recycle()
  169. }
  170. // Further session requests from session pool won't cause mockclient to
  171. // create more sessions.
  172. wantSessions := mock.DumpSessions()
  173. // Take ten sessions from session pool again, this time all sessions should
  174. // come from idle list.
  175. gotSessions := map[string]bool{}
  176. for i := 0; i < len(shs); i++ {
  177. sh, err := sp.takeWriteSession(ctx)
  178. if err != nil {
  179. t.Fatalf("cannot take session from session pool: %v", err)
  180. }
  181. gotSessions[sh.getID()] = true
  182. }
  183. if len(gotSessions) != 10 {
  184. t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
  185. }
  186. if !testEqual(gotSessions, wantSessions) {
  187. t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
  188. }
  189. }
  190. // TestTakeFromIdleListChecked tests taking sessions from session pool's idle
  191. // list, but with a extra ping check.
  192. func TestTakeFromIdleListChecked(t *testing.T) {
  193. t.Parallel()
  194. ctx := context.Background()
  195. // Make sure maintainer keeps the idle sessions.
  196. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
  197. MaxIdle: 1,
  198. HealthCheckInterval: 50 * time.Millisecond,
  199. healthCheckSampleInterval: 10 * time.Millisecond,
  200. })
  201. defer cleanup()
  202. // Stop healthcheck workers to simulate slow pings.
  203. sp.hc.close()
  204. // Create a session and recycle it.
  205. sh, err := sp.take(ctx)
  206. if err != nil {
  207. t.Fatalf("failed to get session: %v", err)
  208. }
  209. // Make sure it's sampled once before recycling, otherwise it will be
  210. // cleaned up.
  211. <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
  212. wantSid := sh.getID()
  213. sh.recycle()
  214. // TODO(deklerk) get rid of this
  215. <-time.After(time.Second)
  216. // Two back-to-back session requests, both of them should return the same
  217. // session created before and none of them should trigger a session ping.
  218. for i := 0; i < 2; i++ {
  219. // Take the session from the idle list and recycle it.
  220. sh, err = sp.take(ctx)
  221. if err != nil {
  222. t.Fatalf("%v - failed to get session: %v", i, err)
  223. }
  224. if gotSid := sh.getID(); gotSid != wantSid {
  225. t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
  226. }
  227. // The two back-to-back session requests shouldn't trigger any session
  228. // pings because sessionPool.Take
  229. // reschedules the next healthcheck.
  230. if got, want := mock.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
  231. t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
  232. }
  233. sh.recycle()
  234. }
  235. // Inject session error to server stub, and take the session from the
  236. // session pool, the old session should be destroyed and the session pool
  237. // will create a new session.
  238. mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  239. mock.MockCloudSpannerClient.ReceivedRequests <- r
  240. return nil, status.Errorf(codes.NotFound, "Session not found")
  241. }
  242. // Delay to trigger sessionPool.Take to ping the session.
  243. // TODO(deklerk) get rid of this
  244. <-time.After(time.Second)
  245. // take will take the idle session. Then it will send a GetSession request
  246. // to check if it's healthy. It'll discover that it's not healthy
  247. // (NotFound), drop it, and create a new session.
  248. sh, err = sp.take(ctx)
  249. if err != nil {
  250. t.Fatalf("failed to get session: %v", err)
  251. }
  252. ds := mock.DumpSessions()
  253. if len(ds) != 1 {
  254. t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
  255. }
  256. if sh.getID() == wantSid {
  257. t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
  258. }
  259. }
  260. // TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
  261. // idle list, but with a extra ping check.
  262. func TestTakeFromIdleWriteListChecked(t *testing.T) {
  263. t.Parallel()
  264. ctx := context.Background()
  265. // Make sure maintainer keeps the idle sessions.
  266. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
  267. MaxIdle: 1,
  268. HealthCheckInterval: 50 * time.Millisecond,
  269. healthCheckSampleInterval: 10 * time.Millisecond,
  270. })
  271. defer cleanup()
  272. // Stop healthcheck workers to simulate slow pings.
  273. sp.hc.close()
  274. // Create a session and recycle it.
  275. sh, err := sp.takeWriteSession(ctx)
  276. if err != nil {
  277. t.Fatalf("failed to get session: %v", err)
  278. }
  279. wantSid := sh.getID()
  280. // Make sure it's sampled once before recycling, otherwise it will be
  281. // cleaned up.
  282. <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
  283. sh.recycle()
  284. // TODO(deklerk) get rid of this
  285. <-time.After(time.Second)
  286. // Two back-to-back session requests, both of them should return the same
  287. // session created before and none of them should trigger a session ping.
  288. for i := 0; i < 2; i++ {
  289. // Take the session from the idle list and recycle it.
  290. sh, err = sp.takeWriteSession(ctx)
  291. if err != nil {
  292. t.Fatalf("%v - failed to get session: %v", i, err)
  293. }
  294. if gotSid := sh.getID(); gotSid != wantSid {
  295. t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
  296. }
  297. // The two back-to-back session requests shouldn't trigger any session
  298. // pings because sessionPool.Take reschedules the next healthcheck.
  299. if got, want := mock.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
  300. t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
  301. }
  302. sh.recycle()
  303. }
  304. // Inject session error to mockclient, and take the session from the
  305. // session pool, the old session should be destroyed and the session pool
  306. // will create a new session.
  307. mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  308. mock.MockCloudSpannerClient.ReceivedRequests <- r
  309. return nil, status.Errorf(codes.NotFound, "Session not found")
  310. }
  311. // Delay to trigger sessionPool.Take to ping the session.
  312. // TOOD(deklerk) get rid of this
  313. <-time.After(time.Second)
  314. sh, err = sp.takeWriteSession(ctx)
  315. if err != nil {
  316. t.Fatalf("failed to get session: %v", err)
  317. }
  318. ds := mock.DumpSessions()
  319. if len(ds) != 1 {
  320. t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
  321. }
  322. if sh.getID() == wantSid {
  323. t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
  324. }
  325. }
  326. // TestMaxOpenedSessions tests max open sessions constraint.
  327. func TestMaxOpenedSessions(t *testing.T) {
  328. t.Parallel()
  329. ctx := context.Background()
  330. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MaxOpened: 1})
  331. defer cleanup()
  332. sh1, err := sp.take(ctx)
  333. if err != nil {
  334. t.Fatalf("cannot take session from session pool: %v", err)
  335. }
  336. // Session request will timeout due to the max open sessions constraint.
  337. ctx2, cancel := context.WithTimeout(ctx, time.Second)
  338. defer cancel()
  339. _, gotErr := sp.take(ctx2)
  340. if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) {
  341. t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
  342. }
  343. go func() {
  344. // TODO(deklerk) remove this
  345. <-time.After(time.Second)
  346. // Destroy the first session to allow the next session request to
  347. // proceed.
  348. sh1.destroy()
  349. }()
  350. // Now session request can be processed because the first session will be
  351. // destroyed.
  352. sh2, err := sp.take(ctx)
  353. if err != nil {
  354. t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
  355. }
  356. if !sh2.session.isValid() || sh2.getID() == "" {
  357. t.Fatalf("got invalid session: %v", sh2.session)
  358. }
  359. }
  360. // TestMinOpenedSessions tests min open session constraint.
  361. func TestMinOpenedSessions(t *testing.T) {
  362. t.Parallel()
  363. ctx := context.Background()
  364. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1})
  365. defer cleanup()
  366. // Take ten sessions from session pool and recycle them.
  367. var ss []*session
  368. var shs []*sessionHandle
  369. for i := 0; i < 10; i++ {
  370. sh, err := sp.take(ctx)
  371. if err != nil {
  372. t.Fatalf("failed to get session(%v): %v", i, err)
  373. }
  374. ss = append(ss, sh.session)
  375. shs = append(shs, sh)
  376. sh.recycle()
  377. }
  378. for _, sh := range shs {
  379. sh.recycle()
  380. }
  381. // Simulate session expiration.
  382. for _, s := range ss {
  383. s.destroy(true)
  384. }
  385. sp.mu.Lock()
  386. defer sp.mu.Unlock()
  387. // There should be still one session left in idle list due to the min open
  388. // sessions constraint.
  389. if sp.idleList.Len() != 1 {
  390. t.Fatalf("got %v sessions in idle list, want 1 %d", sp.idleList.Len(), sp.numOpened)
  391. }
  392. }
  393. // TestMaxBurst tests max burst constraint.
  394. func TestMaxBurst(t *testing.T) {
  395. t.Parallel()
  396. ctx := context.Background()
  397. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxBurst: 1})
  398. defer cleanup()
  399. // Will cause session creation RPC to be retried forever.
  400. allowRequests := make(chan struct{})
  401. mock.CreateSessionFn = func(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  402. select {
  403. case <-allowRequests:
  404. return mock.MockCloudSpannerClient.CreateSession(c, r, opts...)
  405. default:
  406. mock.MockCloudSpannerClient.ReceivedRequests <- r
  407. return nil, status.Errorf(codes.Unavailable, "try later")
  408. }
  409. }
  410. // This session request will never finish until the injected error is
  411. // cleared.
  412. go sp.take(ctx)
  413. // Poll for the execution of the first session request.
  414. for {
  415. sp.mu.Lock()
  416. cr := sp.createReqs
  417. sp.mu.Unlock()
  418. if cr == 0 {
  419. <-time.After(time.Second)
  420. continue
  421. }
  422. // The first session request is being executed.
  423. break
  424. }
  425. ctx2, cancel := context.WithTimeout(ctx, time.Second)
  426. defer cancel()
  427. _, gotErr := sp.take(ctx2)
  428. // Since MaxBurst == 1, the second session request should block.
  429. if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) {
  430. t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
  431. }
  432. // Let the first session request succeed.
  433. close(allowRequests)
  434. // Now new session request can proceed because the first session request will eventually succeed.
  435. sh, err := sp.take(ctx)
  436. if err != nil {
  437. t.Fatalf("session retrival returns error %v, want nil", err)
  438. }
  439. if !sh.session.isValid() || sh.getID() == "" {
  440. t.Fatalf("got invalid session: %v", sh.session)
  441. }
  442. }
  443. // TestSessionRecycle tests recycling sessions.
  444. func TestSessionRecycle(t *testing.T) {
  445. t.Parallel()
  446. ctx := context.Background()
  447. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1, MaxIdle: 5})
  448. defer cleanup()
  449. // Test session is correctly recycled and reused.
  450. for i := 0; i < 20; i++ {
  451. s, err := sp.take(ctx)
  452. if err != nil {
  453. t.Fatalf("cannot get the session %v: %v", i, err)
  454. }
  455. s.recycle()
  456. }
  457. sp.mu.Lock()
  458. defer sp.mu.Unlock()
  459. // Ideally it should only be 1, because the session should be recycled and
  460. // re-used each time. However, sometimes the pool maintainer might increase
  461. // the pool size by 1 right around the time we take (which also increases
  462. // the pool size by 1), so this assertion is OK with either 1 or 2. We
  463. // expect never to see more than 2, though, even when MaxIdle is quite high:
  464. // each session should be recycled and re-used.
  465. if sp.numOpened != 1 && sp.numOpened != 2 {
  466. t.Fatalf("Expect session pool size 1 or 2, got %d", sp.numOpened)
  467. }
  468. }
  469. // TODO(deklerk) Investigate why s.destroy(true) is flakey.
  470. // TestSessionDestroy tests destroying sessions.
  471. func TestSessionDestroy(t *testing.T) {
  472. t.Skip("s.destroy(true) is flakey")
  473. t.Parallel()
  474. ctx := context.Background()
  475. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1})
  476. defer cleanup()
  477. <-time.After(10 * time.Millisecond) // maintainer will create one session, we wait for it create session to avoid flakiness in test
  478. sh, err := sp.take(ctx)
  479. if err != nil {
  480. t.Fatalf("cannot get session from session pool: %v", err)
  481. }
  482. s := sh.session
  483. sh.recycle()
  484. if d := s.destroy(true); d || !s.isValid() {
  485. // Session should be remaining because of min open sessions constraint.
  486. t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
  487. }
  488. if d := s.destroy(false); !d || s.isValid() {
  489. // Session should be destroyed.
  490. t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
  491. }
  492. }
  493. // TestHcHeap tests heap operation on top of hcHeap.
  494. func TestHcHeap(t *testing.T) {
  495. in := []*session{
  496. {nextCheck: time.Unix(10, 0)},
  497. {nextCheck: time.Unix(0, 5)},
  498. {nextCheck: time.Unix(1, 8)},
  499. {nextCheck: time.Unix(11, 7)},
  500. {nextCheck: time.Unix(6, 3)},
  501. }
  502. want := []*session{
  503. {nextCheck: time.Unix(1, 8), hcIndex: 0},
  504. {nextCheck: time.Unix(6, 3), hcIndex: 1},
  505. {nextCheck: time.Unix(8, 2), hcIndex: 2},
  506. {nextCheck: time.Unix(10, 0), hcIndex: 3},
  507. {nextCheck: time.Unix(11, 7), hcIndex: 4},
  508. }
  509. hh := hcHeap{}
  510. for _, s := range in {
  511. heap.Push(&hh, s)
  512. }
  513. // Change top of the heap and do a adjustment.
  514. hh.sessions[0].nextCheck = time.Unix(8, 2)
  515. heap.Fix(&hh, 0)
  516. for idx := 0; hh.Len() > 0; idx++ {
  517. got := heap.Pop(&hh).(*session)
  518. want[idx].hcIndex = -1
  519. if !testEqual(got, want[idx]) {
  520. t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
  521. }
  522. }
  523. }
  524. // TestHealthCheckScheduler tests if healthcheck workers can schedule and
  525. // perform healthchecks properly.
  526. func TestHealthCheckScheduler(t *testing.T) {
  527. t.Parallel()
  528. ctx := context.Background()
  529. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
  530. HealthCheckInterval: 50 * time.Millisecond,
  531. healthCheckSampleInterval: 10 * time.Millisecond,
  532. })
  533. defer cleanup()
  534. // Create 50 sessions.
  535. ss := []string{}
  536. for i := 0; i < 50; i++ {
  537. sh, err := sp.take(ctx)
  538. if err != nil {
  539. t.Fatalf("cannot get session from session pool: %v", err)
  540. }
  541. ss = append(ss, sh.getID())
  542. }
  543. // Wait for 10-30 pings per session.
  544. waitFor(t, func() error {
  545. dp := mock.DumpPings()
  546. gotPings := map[string]int64{}
  547. for _, p := range dp {
  548. gotPings[p]++
  549. }
  550. for _, s := range ss {
  551. want := int64(20)
  552. if got := gotPings[s]; got < want/2 || got > want+want/2 {
  553. // This is an unnacceptable amount of pings.
  554. return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
  555. }
  556. }
  557. return nil
  558. })
  559. }
  560. // Tests that a fractions of sessions are prepared for write by health checker.
  561. func TestWriteSessionsPrepared(t *testing.T) {
  562. t.Parallel()
  563. ctx := context.Background()
  564. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{WriteSessions: 0.5, MaxIdle: 20})
  565. defer cleanup()
  566. shs := make([]*sessionHandle, 10)
  567. var err error
  568. for i := 0; i < 10; i++ {
  569. shs[i], err = sp.take(ctx)
  570. if err != nil {
  571. t.Fatalf("cannot get session from session pool: %v", err)
  572. }
  573. }
  574. // Now there are 10 sessions in the pool. Release them.
  575. for _, sh := range shs {
  576. sh.recycle()
  577. }
  578. // Sleep for 1s, allowing healthcheck workers to invoke begin transaction.
  579. // TODO(deklerk) get rid of this
  580. <-time.After(time.Second)
  581. wshs := make([]*sessionHandle, 5)
  582. for i := 0; i < 5; i++ {
  583. wshs[i], err = sp.takeWriteSession(ctx)
  584. if err != nil {
  585. t.Fatalf("cannot get session from session pool: %v", err)
  586. }
  587. if wshs[i].getTransactionID() == nil {
  588. t.Fatalf("got nil transaction id from session pool")
  589. }
  590. }
  591. for _, sh := range wshs {
  592. sh.recycle()
  593. }
  594. // TODO(deklerk) get rid of this
  595. <-time.After(time.Second)
  596. // Now force creation of 10 more sessions.
  597. shs = make([]*sessionHandle, 20)
  598. for i := 0; i < 20; i++ {
  599. shs[i], err = sp.take(ctx)
  600. if err != nil {
  601. t.Fatalf("cannot get session from session pool: %v", err)
  602. }
  603. }
  604. // Now there are 20 sessions in the pool. Release them.
  605. for _, sh := range shs {
  606. sh.recycle()
  607. }
  608. // TODO(deklerk) get rid of this
  609. <-time.After(time.Second)
  610. if sp.idleWriteList.Len() != 10 {
  611. t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len())
  612. }
  613. }
  614. // TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared sessions as well.
  615. func TestTakeFromWriteQueue(t *testing.T) {
  616. t.Parallel()
  617. ctx := context.Background()
  618. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MaxOpened: 1, WriteSessions: 1.0, MaxIdle: 1})
  619. defer cleanup()
  620. sh, err := sp.take(ctx)
  621. if err != nil {
  622. t.Fatalf("cannot get session from session pool: %v", err)
  623. }
  624. sh.recycle()
  625. // TODO(deklerk) get rid of this
  626. <-time.After(time.Second)
  627. // The session should now be in write queue but take should also return it.
  628. if sp.idleWriteList.Len() == 0 {
  629. t.Fatalf("write queue unexpectedly empty")
  630. }
  631. if sp.idleList.Len() != 0 {
  632. t.Fatalf("read queue not empty")
  633. }
  634. sh, err = sp.take(ctx)
  635. if err != nil {
  636. t.Fatalf("cannot get session from session pool: %v", err)
  637. }
  638. sh.recycle()
  639. }
  640. // TestSessionHealthCheck tests healthchecking cases.
  641. func TestSessionHealthCheck(t *testing.T) {
  642. t.Parallel()
  643. ctx := context.Background()
  644. _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
  645. HealthCheckInterval: 50 * time.Millisecond,
  646. healthCheckSampleInterval: 10 * time.Millisecond,
  647. })
  648. defer cleanup()
  649. var requestShouldErr int64 // 0 == false, 1 == true
  650. mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  651. if shouldErr := atomic.LoadInt64(&requestShouldErr); shouldErr == 1 {
  652. mock.MockCloudSpannerClient.ReceivedRequests <- r
  653. return nil, status.Errorf(codes.NotFound, "Session not found")
  654. }
  655. return mock.MockCloudSpannerClient.GetSession(c, r, opts...)
  656. }
  657. // Test pinging sessions.
  658. sh, err := sp.take(ctx)
  659. if err != nil {
  660. t.Fatalf("cannot get session from session pool: %v", err)
  661. }
  662. // Wait for healthchecker to send pings to session.
  663. waitFor(t, func() error {
  664. pings := mock.DumpPings()
  665. if len(pings) == 0 || pings[0] != sh.getID() {
  666. return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
  667. }
  668. return nil
  669. })
  670. // Test broken session detection.
  671. sh, err = sp.take(ctx)
  672. if err != nil {
  673. t.Fatalf("cannot get session from session pool: %v", err)
  674. }
  675. atomic.SwapInt64(&requestShouldErr, 1)
  676. // Wait for healthcheck workers to find the broken session and tear it down.
  677. // TODO(deklerk) get rid of this
  678. <-time.After(1 * time.Second)
  679. s := sh.session
  680. if sh.session.isValid() {
  681. t.Fatalf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
  682. }
  683. atomic.SwapInt64(&requestShouldErr, 0)
  684. // Test garbage collection.
  685. sh, err = sp.take(ctx)
  686. if err != nil {
  687. t.Fatalf("cannot get session from session pool: %v", err)
  688. }
  689. sp.close()
  690. if sh.session.isValid() {
  691. t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
  692. }
  693. }
  694. // TestStressSessionPool does stress test on session pool by the following concurrent operations:
  695. // 1) Test worker gets a session from the pool.
  696. // 2) Test worker turns a session back into the pool.
  697. // 3) Test worker destroys a session got from the pool.
  698. // 4) Healthcheck destroys a broken session (because a worker has already destroyed it).
  699. // 5) Test worker closes the session pool.
  700. //
  701. // During the test, the session pool maintainer maintains the number of sessions,
  702. // and it is expected that all sessions that are taken from session pool remains valid.
  703. // When all test workers and healthcheck workers exit, mockclient, session pool
  704. // and healthchecker should be in consistent state.
  705. func TestStressSessionPool(t *testing.T) {
  706. t.Parallel()
  707. ctx := context.Background()
  708. // Use concurrent workers to test different session pool built from different configurations.
  709. for ti, cfg := range []SessionPoolConfig{
  710. {},
  711. {MinOpened: 10, MaxOpened: 100},
  712. {MaxBurst: 50},
  713. {MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
  714. {MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
  715. } {
  716. var wg sync.WaitGroup
  717. // Create a more aggressive session healthchecker to increase test concurrency.
  718. cfg.HealthCheckInterval = 50 * time.Millisecond
  719. cfg.healthCheckSampleInterval = 10 * time.Millisecond
  720. cfg.HealthCheckWorkers = 50
  721. sc := testutil.NewMockCloudSpannerClient(t)
  722. cfg.getRPCClient = func() (sppb.SpannerClient, error) {
  723. return sc, nil
  724. }
  725. sp, _ := newSessionPool("mockdb", cfg, nil)
  726. defer sp.hc.close()
  727. defer sp.close()
  728. for i := 0; i < 100; i++ {
  729. wg.Add(1)
  730. // Schedule a test worker.
  731. go func(idx int, pool *sessionPool, client sppb.SpannerClient) {
  732. defer wg.Done()
  733. // Test worker iterates 1K times and tries different session / session pool operations.
  734. for j := 0; j < 1000; j++ {
  735. if idx%10 == 0 && j >= 900 {
  736. // Close the pool in selected set of workers during the middle of the test.
  737. pool.close()
  738. }
  739. // Take a write sessions ~ 20% of the times.
  740. takeWrite := rand.Intn(5) == 4
  741. var (
  742. sh *sessionHandle
  743. gotErr error
  744. )
  745. if takeWrite {
  746. sh, gotErr = pool.takeWriteSession(ctx)
  747. } else {
  748. sh, gotErr = pool.take(ctx)
  749. }
  750. if gotErr != nil {
  751. if pool.isValid() {
  752. t.Errorf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
  753. }
  754. if wantErr := errInvalidSessionPool(); !testEqual(gotErr, wantErr) {
  755. t.Errorf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
  756. }
  757. continue
  758. }
  759. // Verify if session is valid when session pool is valid. Note that if session pool is invalid after sh is taken,
  760. // then sh might be invalidated by healthcheck workers.
  761. if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
  762. t.Errorf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
  763. }
  764. if takeWrite && sh.getTransactionID() == nil {
  765. t.Errorf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
  766. }
  767. if rand.Intn(100) < idx {
  768. // Random sleep before destroying/recycling the session, to give healthcheck worker a chance to step in.
  769. <-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
  770. }
  771. if rand.Intn(100) < idx {
  772. // destroy the session.
  773. sh.destroy()
  774. continue
  775. }
  776. // recycle the session.
  777. sh.recycle()
  778. }
  779. }(i, sp, sc)
  780. }
  781. wg.Wait()
  782. sp.hc.close()
  783. // Here the states of healthchecker, session pool and mockclient are stable.
  784. idleSessions := map[string]bool{}
  785. hcSessions := map[string]bool{}
  786. mockSessions := sc.DumpSessions()
  787. // Dump session pool's idle list.
  788. for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
  789. s := sl.Value.(*session)
  790. if idleSessions[s.getID()] {
  791. t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
  792. }
  793. idleSessions[s.getID()] = true
  794. }
  795. for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
  796. s := sl.Value.(*session)
  797. if idleSessions[s.getID()] {
  798. t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
  799. }
  800. idleSessions[s.getID()] = true
  801. }
  802. sp.mu.Lock()
  803. if int(sp.numOpened) != len(idleSessions) {
  804. t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
  805. }
  806. if sp.createReqs != 0 {
  807. t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
  808. }
  809. // Dump healthcheck queue.
  810. for _, s := range sp.hc.queue.sessions {
  811. if hcSessions[s.getID()] {
  812. t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
  813. }
  814. hcSessions[s.getID()] = true
  815. }
  816. sp.mu.Unlock()
  817. // Verify that idleSessions == hcSessions == mockSessions.
  818. if !testEqual(idleSessions, hcSessions) {
  819. t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
  820. }
  821. if !testEqual(hcSessions, mockSessions) {
  822. t.Fatalf("%v: sessions in healthcheck queue (%v) != sessions in mockclient (%v)", ti, hcSessions, mockSessions)
  823. }
  824. sp.close()
  825. mockSessions = sc.DumpSessions()
  826. if len(mockSessions) != 0 {
  827. t.Fatalf("Found live sessions: %v", mockSessions)
  828. }
  829. }
  830. }
  831. // TODO(deklerk) Investigate why this test is flakey, even with waitFor. Example
  832. // flakey failure: session_test.go:946: after 15s waiting, got Scale down. Expect 5 open, got 6
  833. //
  834. // TestMaintainer checks the session pool maintainer maintains the number of sessions in the following cases
  835. // 1. On initialization of session pool, replenish session pool to meet MinOpened or MaxIdle.
  836. // 2. On increased session usage, provision extra MaxIdle sessions.
  837. // 3. After the surge passes, scale down the session pool accordingly.
  838. func TestMaintainer(t *testing.T) {
  839. t.Skip("asserting session state seems flakey")
  840. t.Parallel()
  841. ctx := context.Background()
  842. minOpened := uint64(5)
  843. maxIdle := uint64(4)
  844. _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: minOpened, MaxIdle: maxIdle})
  845. defer cleanup()
  846. sampleInterval := sp.SessionPoolConfig.healthCheckSampleInterval
  847. waitFor(t, func() error {
  848. sp.mu.Lock()
  849. defer sp.mu.Unlock()
  850. if sp.numOpened != 5 {
  851. return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
  852. }
  853. return nil
  854. })
  855. // To save test time, we are not creating many sessions, because the time
  856. // to create sessions will have impact on the decision on sessionsToKeep.
  857. // We also parallelize the take and recycle process.
  858. shs := make([]*sessionHandle, 10)
  859. for i := 0; i < len(shs); i++ {
  860. var err error
  861. shs[i], err = sp.take(ctx)
  862. if err != nil {
  863. t.Fatalf("cannot get session from session pool: %v", err)
  864. }
  865. }
  866. sp.mu.Lock()
  867. if sp.numOpened != 10 {
  868. t.Fatalf("Scale out from normal use. Expect %d open, got %d", 10, sp.numOpened)
  869. }
  870. sp.mu.Unlock()
  871. <-time.After(sampleInterval)
  872. for _, sh := range shs[:7] {
  873. sh.recycle()
  874. }
  875. waitFor(t, func() error {
  876. sp.mu.Lock()
  877. defer sp.mu.Unlock()
  878. if sp.numOpened != 7 {
  879. return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 7, sp.numOpened)
  880. }
  881. return nil
  882. })
  883. for _, sh := range shs[7:] {
  884. sh.recycle()
  885. }
  886. waitFor(t, func() error {
  887. sp.mu.Lock()
  888. defer sp.mu.Unlock()
  889. if sp.numOpened != minOpened {
  890. return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened, sp.numOpened)
  891. }
  892. return nil
  893. })
  894. }
  895. // Tests that maintainer creates up to MinOpened connections.
  896. //
  897. // Historical context: This test also checks that a low healthCheckSampleInterval
  898. // does not prevent it from opening connections. See: https://github.com/googleapis/google-cloud-go/issues/1259
  899. func TestMaintainer_CreatesSessions(t *testing.T) {
  900. t.Parallel()
  901. rawServerStub := testutil.NewMockCloudSpannerClient(t)
  902. serverClientMock := testutil.FuncMock{MockCloudSpannerClient: rawServerStub}
  903. serverClientMock.CreateSessionFn = func(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  904. time.Sleep(10 * time.Millisecond)
  905. return rawServerStub.CreateSession(c, r, opts...)
  906. }
  907. spc := SessionPoolConfig{
  908. MinOpened: 10,
  909. MaxIdle: 10,
  910. healthCheckSampleInterval: time.Millisecond,
  911. getRPCClient: func() (sppb.SpannerClient, error) {
  912. return &serverClientMock, nil
  913. },
  914. }
  915. db := "mockdb"
  916. sp, err := newSessionPool(db, spc, nil)
  917. if err != nil {
  918. t.Fatalf("cannot create session pool: %v", err)
  919. }
  920. client := Client{
  921. database: db,
  922. idleSessions: sp,
  923. }
  924. defer func() {
  925. client.Close()
  926. sp.hc.close()
  927. sp.close()
  928. }()
  929. timeoutAmt := 2 * time.Second
  930. timeout := time.After(timeoutAmt)
  931. var numOpened uint64
  932. loop:
  933. for {
  934. select {
  935. case <-timeout:
  936. t.Fatalf("timed out after %v, got %d session(s), want %d", timeoutAmt, numOpened, spc.MinOpened)
  937. default:
  938. sp.mu.Lock()
  939. numOpened = sp.numOpened
  940. sp.mu.Unlock()
  941. if numOpened == 10 {
  942. break loop
  943. }
  944. }
  945. }
  946. }
  947. func (s1 *session) Equal(s2 *session) bool {
  948. return s1.client == s2.client &&
  949. s1.id == s2.id &&
  950. s1.pool == s2.pool &&
  951. s1.createTime == s2.createTime &&
  952. s1.valid == s2.valid &&
  953. s1.hcIndex == s2.hcIndex &&
  954. s1.idleList == s2.idleList &&
  955. s1.nextCheck.Equal(s2.nextCheck) &&
  956. s1.checkingHealth == s2.checkingHealth &&
  957. testEqual(s1.md, s2.md) &&
  958. bytes.Equal(s1.tx, s2.tx)
  959. }
  960. func waitFor(t *testing.T, assert func() error) {
  961. t.Helper()
  962. timeout := 15 * time.Second
  963. ta := time.After(timeout)
  964. for {
  965. select {
  966. case <-ta:
  967. if err := assert(); err != nil {
  968. t.Fatalf("after %v waiting, got %v", timeout, err)
  969. }
  970. return
  971. default:
  972. }
  973. if err := assert(); err != nil {
  974. // Fail. Let's pause and retry.
  975. time.Sleep(10 * time.Millisecond)
  976. continue
  977. }
  978. return
  979. }
  980. }