You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

395 lines
12 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. "cloud.google.com/go/spanner/internal/testutil"
  23. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. )
  27. // Single can only be used once.
  28. func TestSingle(t *testing.T) {
  29. t.Parallel()
  30. ctx := context.Background()
  31. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  32. defer cleanup()
  33. txn := client.Single()
  34. defer txn.Close()
  35. _, _, e := txn.acquire(ctx)
  36. if e != nil {
  37. t.Fatalf("Acquire for single use, got %v, want nil.", e)
  38. }
  39. _, _, e = txn.acquire(ctx)
  40. if wantErr := errTxClosed(); !testEqual(e, wantErr) {
  41. t.Fatalf("Second acquire for single use, got %v, want %v.", e, wantErr)
  42. }
  43. // Only one CreateSessionRequest is sent.
  44. if _, err := shouldHaveReceived(mock, []interface{}{&sppb.CreateSessionRequest{}}); err != nil {
  45. t.Fatal(err)
  46. }
  47. }
  48. // Re-using ReadOnlyTransaction: can recover from acquire failure.
  49. func TestReadOnlyTransaction_RecoverFromFailure(t *testing.T) {
  50. t.Parallel()
  51. ctx := context.Background()
  52. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  53. defer cleanup()
  54. txn := client.ReadOnlyTransaction()
  55. defer txn.Close()
  56. // First request will fail, which should trigger a retry.
  57. errUsr := errors.New("error")
  58. firstCall := true
  59. mock.BeginTransactionFn = func(c context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) {
  60. if firstCall {
  61. mock.MockCloudSpannerClient.ReceivedRequests <- r
  62. firstCall = false
  63. return nil, errUsr
  64. }
  65. return mock.MockCloudSpannerClient.BeginTransaction(c, r, opts...)
  66. }
  67. _, _, e := txn.acquire(ctx)
  68. if wantErr := toSpannerError(errUsr); !testEqual(e, wantErr) {
  69. t.Fatalf("Acquire for multi use, got %v, want %v.", e, wantErr)
  70. }
  71. _, _, e = txn.acquire(ctx)
  72. if e != nil {
  73. t.Fatalf("Acquire for multi use, got %v, want nil.", e)
  74. }
  75. }
  76. // ReadOnlyTransaction: can not be used after close.
  77. func TestReadOnlyTransaction_UseAfterClose(t *testing.T) {
  78. t.Parallel()
  79. ctx := context.Background()
  80. client, _, _, cleanup := serverClientMock(t, SessionPoolConfig{})
  81. defer cleanup()
  82. txn := client.ReadOnlyTransaction()
  83. txn.Close()
  84. _, _, e := txn.acquire(ctx)
  85. if wantErr := errTxClosed(); !testEqual(e, wantErr) {
  86. t.Fatalf("Second acquire for multi use, got %v, want %v.", e, wantErr)
  87. }
  88. }
  89. // ReadOnlyTransaction: can be acquired concurrently.
  90. func TestReadOnlyTransaction_Concurrent(t *testing.T) {
  91. t.Parallel()
  92. ctx := context.Background()
  93. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  94. defer cleanup()
  95. txn := client.ReadOnlyTransaction()
  96. defer txn.Close()
  97. mock.Freeze()
  98. var (
  99. sh1 *sessionHandle
  100. sh2 *sessionHandle
  101. ts1 *sppb.TransactionSelector
  102. ts2 *sppb.TransactionSelector
  103. wg = sync.WaitGroup{}
  104. )
  105. acquire := func(sh **sessionHandle, ts **sppb.TransactionSelector) {
  106. defer wg.Done()
  107. var e error
  108. *sh, *ts, e = txn.acquire(ctx)
  109. if e != nil {
  110. t.Errorf("Concurrent acquire for multiuse, got %v, expect nil.", e)
  111. }
  112. }
  113. wg.Add(2)
  114. go acquire(&sh1, &ts1)
  115. go acquire(&sh2, &ts2)
  116. // TODO(deklerk): Get rid of this.
  117. <-time.After(100 * time.Millisecond)
  118. mock.Unfreeze()
  119. wg.Wait()
  120. if sh1.session.id != sh2.session.id {
  121. t.Fatalf("Expected acquire to get same session handle, got %v and %v.", sh1, sh2)
  122. }
  123. if !testEqual(ts1, ts2) {
  124. t.Fatalf("Expected acquire to get same transaction selector, got %v and %v.", ts1, ts2)
  125. }
  126. }
  127. func TestApply_Single(t *testing.T) {
  128. t.Parallel()
  129. ctx := context.Background()
  130. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  131. defer cleanup()
  132. ms := []*Mutation{
  133. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
  134. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
  135. }
  136. if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil {
  137. t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e)
  138. }
  139. if _, err := shouldHaveReceived(mock, []interface{}{
  140. &sppb.CreateSessionRequest{},
  141. &sppb.CommitRequest{},
  142. }); err != nil {
  143. t.Fatal(err)
  144. }
  145. }
  146. // Transaction retries on abort.
  147. func TestApply_RetryOnAbort(t *testing.T) {
  148. ctx := context.Background()
  149. t.Parallel()
  150. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  151. defer cleanup()
  152. // First commit will fail, and the retry will begin a new transaction.
  153. errAbrt := spannerErrorf(codes.Aborted, "")
  154. firstCommitCall := true
  155. mock.CommitFn = func(c context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) {
  156. if firstCommitCall {
  157. mock.MockCloudSpannerClient.ReceivedRequests <- r
  158. firstCommitCall = false
  159. return nil, errAbrt
  160. }
  161. return mock.MockCloudSpannerClient.Commit(c, r, opts...)
  162. }
  163. ms := []*Mutation{
  164. Insert("Accounts", []string{"AccountId"}, []interface{}{int64(1)}),
  165. }
  166. if _, e := client.Apply(ctx, ms); e != nil {
  167. t.Fatalf("ReadWriteTransaction retry on abort, got %v, want nil.", e)
  168. }
  169. if _, err := shouldHaveReceived(mock, []interface{}{
  170. &sppb.CreateSessionRequest{},
  171. &sppb.BeginTransactionRequest{},
  172. &sppb.CommitRequest{}, // First commit fails.
  173. &sppb.BeginTransactionRequest{},
  174. &sppb.CommitRequest{}, // Second commit succeeds.
  175. }); err != nil {
  176. t.Fatal(err)
  177. }
  178. }
  179. // Tests that NotFound errors cause failures, and aren't retried.
  180. func TestTransaction_NotFound(t *testing.T) {
  181. t.Parallel()
  182. ctx := context.Background()
  183. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  184. defer cleanup()
  185. wantErr := spannerErrorf(codes.NotFound, "Session not found")
  186. mock.BeginTransactionFn = func(c context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) {
  187. mock.MockCloudSpannerClient.ReceivedRequests <- r
  188. return nil, wantErr
  189. }
  190. mock.CommitFn = func(c context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) {
  191. mock.MockCloudSpannerClient.ReceivedRequests <- r
  192. return nil, wantErr
  193. }
  194. txn := client.ReadOnlyTransaction()
  195. defer txn.Close()
  196. if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
  197. t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr)
  198. }
  199. // The failure should recycle the session, we expect it to be used in following requests.
  200. if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
  201. t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr)
  202. }
  203. if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
  204. t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr)
  205. }
  206. ms := []*Mutation{
  207. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
  208. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
  209. }
  210. if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(wantErr, got) {
  211. t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
  212. }
  213. }
  214. // When an error is returned from the closure sent into ReadWriteTransaction, it
  215. // kicks off a rollback.
  216. func TestReadWriteTransaction_ErrorReturned(t *testing.T) {
  217. t.Parallel()
  218. ctx := context.Background()
  219. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  220. defer cleanup()
  221. want := errors.New("an error")
  222. _, got := client.ReadWriteTransaction(ctx, func(context.Context, *ReadWriteTransaction) error {
  223. return want
  224. })
  225. if got != want {
  226. t.Fatalf("got %+v, want %+v", got, want)
  227. }
  228. if _, err := shouldHaveReceived(mock, []interface{}{
  229. &sppb.CreateSessionRequest{},
  230. &sppb.BeginTransactionRequest{},
  231. &sppb.RollbackRequest{},
  232. }); err != nil {
  233. t.Fatal(err)
  234. }
  235. }
  236. func TestBatchDML_WithMultipleDML(t *testing.T) {
  237. t.Parallel()
  238. ctx := context.Background()
  239. client, _, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
  240. defer cleanup()
  241. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
  242. if _, err = tx.Update(ctx, Statement{SQL: "SELECT * FROM whatever"}); err != nil {
  243. return err
  244. }
  245. if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: "SELECT * FROM whatever"}, {SQL: "SELECT * FROM whatever"}}); err != nil {
  246. return err
  247. }
  248. if _, err = tx.Update(ctx, Statement{SQL: "SELECT * FROM whatever"}); err != nil {
  249. return err
  250. }
  251. _, err = tx.BatchUpdate(ctx, []Statement{{SQL: "SELECT * FROM whatever"}})
  252. return err
  253. })
  254. if err != nil {
  255. t.Fatal(err)
  256. }
  257. gotReqs, err := shouldHaveReceived(mock, []interface{}{
  258. &sppb.CreateSessionRequest{},
  259. &sppb.BeginTransactionRequest{},
  260. &sppb.ExecuteSqlRequest{},
  261. &sppb.ExecuteBatchDmlRequest{},
  262. &sppb.ExecuteSqlRequest{},
  263. &sppb.ExecuteBatchDmlRequest{},
  264. &sppb.CommitRequest{},
  265. })
  266. if err != nil {
  267. t.Fatal(err)
  268. }
  269. if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want {
  270. t.Errorf("got %d, want %d", got, want)
  271. }
  272. if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want {
  273. t.Errorf("got %d, want %d", got, want)
  274. }
  275. if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want {
  276. t.Errorf("got %d, want %d", got, want)
  277. }
  278. if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want {
  279. t.Errorf("got %d, want %d", got, want)
  280. }
  281. }
  282. // shouldHaveReceived asserts that exactly expectedRequests were present in
  283. // the server's ReceivedRequests channel. It only looks at type, not contents.
  284. //
  285. // Note: this in-place modifies serverClientMock by popping items off the
  286. // ReceivedRequests channel.
  287. func shouldHaveReceived(mock *testutil.FuncMock, want []interface{}) ([]interface{}, error) {
  288. got := drainRequests(mock)
  289. if len(got) != len(want) {
  290. var gotMsg string
  291. for _, r := range got {
  292. gotMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r)
  293. }
  294. var wantMsg string
  295. for _, r := range want {
  296. wantMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r)
  297. }
  298. return got, fmt.Errorf("got %d requests, want %d requests:\ngot:\n%s\nwant:\n%s", len(got), len(want), gotMsg, wantMsg)
  299. }
  300. for i, want := range want {
  301. if reflect.TypeOf(got[i]) != reflect.TypeOf(want) {
  302. return got, fmt.Errorf("request %d: got %+v, want %+v", i, reflect.TypeOf(got[i]), reflect.TypeOf(want))
  303. }
  304. }
  305. return got, nil
  306. }
  307. func drainRequests(mock *testutil.FuncMock) []interface{} {
  308. var reqs []interface{}
  309. loop:
  310. for {
  311. select {
  312. case req := <-mock.ReceivedRequests:
  313. reqs = append(reqs, req)
  314. default:
  315. break loop
  316. }
  317. }
  318. return reqs
  319. }
  320. // serverClientMock sets up a client configured to a NewMockCloudSpannerClient
  321. // that is wrapped with a function-injectable wrapper.
  322. //
  323. // Note: be sure to call cleanup!
  324. func serverClientMock(t *testing.T, spc SessionPoolConfig) (_ *Client, _ *sessionPool, _ *testutil.FuncMock, cleanup func()) {
  325. rawServerStub := testutil.NewMockCloudSpannerClient(t)
  326. serverClientMock := testutil.FuncMock{MockCloudSpannerClient: rawServerStub}
  327. spc.getRPCClient = func() (sppb.SpannerClient, error) {
  328. return &serverClientMock, nil
  329. }
  330. db := "mockdb"
  331. sp, err := newSessionPool(db, spc, nil)
  332. if err != nil {
  333. t.Fatalf("cannot create session pool: %v", err)
  334. }
  335. client := Client{
  336. database: db,
  337. idleSessions: sp,
  338. }
  339. cleanup = func() {
  340. client.Close()
  341. sp.hc.close()
  342. sp.close()
  343. }
  344. return &client, sp, &serverClientMock, cleanup
  345. }