You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

2499 lines
75 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "context"
  16. "errors"
  17. "flag"
  18. "fmt"
  19. "log"
  20. "math"
  21. "os"
  22. "reflect"
  23. "strings"
  24. "sync"
  25. "testing"
  26. "time"
  27. "cloud.google.com/go/civil"
  28. "cloud.google.com/go/internal/testutil"
  29. "cloud.google.com/go/internal/uid"
  30. database "cloud.google.com/go/spanner/admin/database/apiv1"
  31. "google.golang.org/api/iterator"
  32. "google.golang.org/api/option"
  33. adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/status"
  36. )
  37. var (
  38. // testProjectID specifies the project used for testing.
  39. // It can be changed by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
  40. testProjectID = testutil.ProjID()
  41. dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
  42. // TODO(deklerk) When we can programmatically create instances, we should use
  43. // uid.New as the test instance name.
  44. // testInstanceID specifies the Cloud Spanner instance used for testing.
  45. testInstanceID = "go-integration-test"
  46. testTable = "TestTable"
  47. testTableIndex = "TestTableByValue"
  48. testTableColumns = []string{"Key", "StringValue"}
  49. // admin is a spanner.DatabaseAdminClient.
  50. admin *database.DatabaseAdminClient
  51. singerDBStatements = []string{
  52. `CREATE TABLE Singers (
  53. SingerId INT64 NOT NULL,
  54. FirstName STRING(1024),
  55. LastName STRING(1024),
  56. SingerInfo BYTES(MAX)
  57. ) PRIMARY KEY (SingerId)`,
  58. `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
  59. `CREATE TABLE Accounts (
  60. AccountId INT64 NOT NULL,
  61. Nickname STRING(100),
  62. Balance INT64 NOT NULL,
  63. ) PRIMARY KEY (AccountId)`,
  64. `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
  65. `CREATE TABLE Types (
  66. RowID INT64 NOT NULL,
  67. String STRING(MAX),
  68. StringArray ARRAY<STRING(MAX)>,
  69. Bytes BYTES(MAX),
  70. BytesArray ARRAY<BYTES(MAX)>,
  71. Int64a INT64,
  72. Int64Array ARRAY<INT64>,
  73. Bool BOOL,
  74. BoolArray ARRAY<BOOL>,
  75. Float64 FLOAT64,
  76. Float64Array ARRAY<FLOAT64>,
  77. Date DATE,
  78. DateArray ARRAY<DATE>,
  79. Timestamp TIMESTAMP,
  80. TimestampArray ARRAY<TIMESTAMP>,
  81. ) PRIMARY KEY (RowID)`,
  82. }
  83. readDBStatements = []string{
  84. `CREATE TABLE TestTable (
  85. Key STRING(MAX) NOT NULL,
  86. StringValue STRING(MAX)
  87. ) PRIMARY KEY (Key)`,
  88. `CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
  89. `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
  90. }
  91. simpleDBStatements = []string{
  92. `CREATE TABLE test (
  93. a STRING(1024),
  94. b STRING(1024),
  95. ) PRIMARY KEY (a)`,
  96. }
  97. simpleDBTableColumns = []string{"a", "b"}
  98. ctsDBStatements = []string{
  99. `CREATE TABLE TestTable (
  100. Key STRING(MAX) NOT NULL,
  101. Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true),
  102. ) PRIMARY KEY (Key)`,
  103. }
  104. )
  105. const (
  106. str1 = "alice"
  107. str2 = "a@example.com"
  108. )
  109. func TestMain(m *testing.M) {
  110. cleanup := initIntegrationTests()
  111. res := m.Run()
  112. cleanup()
  113. os.Exit(res)
  114. }
  115. func initIntegrationTests() func() {
  116. ctx := context.Background()
  117. flag.Parse() // needed for testing.Short()
  118. noop := func() {}
  119. if testing.Short() {
  120. log.Println("Integration tests skipped in -short mode.")
  121. return noop
  122. }
  123. if testProjectID == "" {
  124. log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
  125. return noop
  126. }
  127. ts := testutil.TokenSource(ctx, AdminScope, Scope)
  128. if ts == nil {
  129. log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY")
  130. return noop
  131. }
  132. var err error
  133. // Create Admin client and Data client.
  134. admin, err = database.NewDatabaseAdminClient(ctx, option.WithTokenSource(ts), option.WithEndpoint(endpoint))
  135. if err != nil {
  136. log.Fatalf("cannot create admin client: %v", err)
  137. }
  138. return func() {
  139. cleanupDatabases()
  140. admin.Close()
  141. }
  142. }
  143. // Test SingleUse transaction.
  144. func TestIntegration_SingleUse(t *testing.T) {
  145. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  146. defer cancel()
  147. // Set up testing environment.
  148. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  149. defer cleanup()
  150. writes := []struct {
  151. row []interface{}
  152. ts time.Time
  153. }{
  154. {row: []interface{}{1, "Marc", "Foo"}},
  155. {row: []interface{}{2, "Tars", "Bar"}},
  156. {row: []interface{}{3, "Alpha", "Beta"}},
  157. {row: []interface{}{4, "Last", "End"}},
  158. }
  159. // Try to write four rows through the Apply API.
  160. for i, w := range writes {
  161. var err error
  162. m := InsertOrUpdate("Singers",
  163. []string{"SingerId", "FirstName", "LastName"},
  164. w.row)
  165. if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
  166. t.Fatal(err)
  167. }
  168. }
  169. // For testing timestamp bound staleness.
  170. <-time.After(time.Second)
  171. // Test reading rows with different timestamp bounds.
  172. for i, test := range []struct {
  173. want [][]interface{}
  174. tb TimestampBound
  175. checkTs func(time.Time) error
  176. }{
  177. {
  178. // strong
  179. [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
  180. StrongRead(),
  181. func(ts time.Time) error {
  182. // writes[3] is the last write, all subsequent strong read should have a timestamp larger than that.
  183. if ts.Before(writes[3].ts) {
  184. return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
  185. }
  186. return nil
  187. },
  188. },
  189. {
  190. // min_read_timestamp
  191. [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
  192. MinReadTimestamp(writes[3].ts),
  193. func(ts time.Time) error {
  194. if ts.Before(writes[3].ts) {
  195. return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
  196. }
  197. return nil
  198. },
  199. },
  200. {
  201. // max_staleness
  202. [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
  203. MaxStaleness(time.Second),
  204. func(ts time.Time) error {
  205. if ts.Before(writes[3].ts) {
  206. return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
  207. }
  208. return nil
  209. },
  210. },
  211. {
  212. // read_timestamp
  213. [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
  214. ReadTimestamp(writes[2].ts),
  215. func(ts time.Time) error {
  216. if ts != writes[2].ts {
  217. return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
  218. }
  219. return nil
  220. },
  221. },
  222. {
  223. // exact_staleness
  224. nil,
  225. // Specify a staleness which should be already before this test because
  226. // context timeout is set to be 10s.
  227. ExactStaleness(11 * time.Second),
  228. func(ts time.Time) error {
  229. if ts.After(writes[0].ts) {
  230. return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
  231. }
  232. return nil
  233. },
  234. },
  235. } {
  236. // SingleUse.Query
  237. su := client.Single().WithTimestampBound(test.tb)
  238. got, err := readAll(su.Query(
  239. ctx,
  240. Statement{
  241. "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
  242. map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
  243. }))
  244. if err != nil {
  245. t.Errorf("%d: SingleUse.Query returns error %v, want nil", i, err)
  246. }
  247. if !testEqual(got, test.want) {
  248. t.Errorf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
  249. }
  250. rts, err := su.Timestamp()
  251. if err != nil {
  252. t.Errorf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
  253. }
  254. if err := test.checkTs(rts); err != nil {
  255. t.Errorf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
  256. }
  257. // SingleUse.Read
  258. su = client.Single().WithTimestampBound(test.tb)
  259. got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
  260. if err != nil {
  261. t.Errorf("%d: SingleUse.Read returns error %v, want nil", i, err)
  262. }
  263. if !testEqual(got, test.want) {
  264. t.Errorf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
  265. }
  266. rts, err = su.Timestamp()
  267. if err != nil {
  268. t.Errorf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
  269. }
  270. if err := test.checkTs(rts); err != nil {
  271. t.Errorf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
  272. }
  273. // SingleUse.ReadRow
  274. got = nil
  275. for _, k := range []Key{{1}, {3}, {4}} {
  276. su = client.Single().WithTimestampBound(test.tb)
  277. r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
  278. if err != nil {
  279. continue
  280. }
  281. v, err := rowToValues(r)
  282. if err != nil {
  283. continue
  284. }
  285. got = append(got, v)
  286. rts, err = su.Timestamp()
  287. if err != nil {
  288. t.Errorf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
  289. }
  290. if err := test.checkTs(rts); err != nil {
  291. t.Errorf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
  292. }
  293. }
  294. if !testEqual(got, test.want) {
  295. t.Errorf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
  296. }
  297. // SingleUse.ReadUsingIndex
  298. su = client.Single().WithTimestampBound(test.tb)
  299. got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
  300. if err != nil {
  301. t.Errorf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
  302. }
  303. // The results from ReadUsingIndex is sorted by the index rather than primary key.
  304. if len(got) != len(test.want) {
  305. t.Errorf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
  306. }
  307. for j, g := range got {
  308. if j > 0 {
  309. prev := got[j-1][1].(string) + got[j-1][2].(string)
  310. curr := got[j][1].(string) + got[j][2].(string)
  311. if strings.Compare(prev, curr) > 0 {
  312. t.Errorf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
  313. }
  314. }
  315. found := false
  316. for _, w := range test.want {
  317. if testEqual(g, w) {
  318. found = true
  319. }
  320. }
  321. if !found {
  322. t.Errorf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
  323. break
  324. }
  325. }
  326. rts, err = su.Timestamp()
  327. if err != nil {
  328. t.Errorf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
  329. }
  330. if err := test.checkTs(rts); err != nil {
  331. t.Errorf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
  332. }
  333. }
  334. // Reading with limit.
  335. su := client.Single()
  336. const limit = 1
  337. gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
  338. []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
  339. if err != nil {
  340. t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
  341. }
  342. if got, want := len(gotRows), limit; got != want {
  343. t.Errorf("got %d, want %d", got, want)
  344. }
  345. }
  346. // Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
  347. // also tests for a single timestamp across multiple reads.
  348. func TestIntegration_ReadOnlyTransaction(t *testing.T) {
  349. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  350. defer cancel()
  351. // Set up testing environment.
  352. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  353. defer cleanup()
  354. writes := []struct {
  355. row []interface{}
  356. ts time.Time
  357. }{
  358. {row: []interface{}{1, "Marc", "Foo"}},
  359. {row: []interface{}{2, "Tars", "Bar"}},
  360. {row: []interface{}{3, "Alpha", "Beta"}},
  361. {row: []interface{}{4, "Last", "End"}},
  362. }
  363. // Try to write four rows through the Apply API.
  364. for i, w := range writes {
  365. var err error
  366. m := InsertOrUpdate("Singers",
  367. []string{"SingerId", "FirstName", "LastName"},
  368. w.row)
  369. if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
  370. t.Fatal(err)
  371. }
  372. }
  373. // For testing timestamp bound staleness.
  374. <-time.After(time.Second)
  375. // Test reading rows with different timestamp bounds.
  376. for i, test := range []struct {
  377. want [][]interface{}
  378. tb TimestampBound
  379. checkTs func(time.Time) error
  380. }{
  381. // Note: min_read_timestamp and max_staleness are not supported by ReadOnlyTransaction. See
  382. // API document for more details.
  383. {
  384. // strong
  385. [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
  386. StrongRead(),
  387. func(ts time.Time) error {
  388. if ts.Before(writes[3].ts) {
  389. return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
  390. }
  391. return nil
  392. },
  393. },
  394. {
  395. // read_timestamp
  396. [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
  397. ReadTimestamp(writes[2].ts),
  398. func(ts time.Time) error {
  399. if ts != writes[2].ts {
  400. return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
  401. }
  402. return nil
  403. },
  404. },
  405. {
  406. // exact_staleness
  407. nil,
  408. // Specify a staleness which should be already before this test because
  409. // context timeout is set to be 10s.
  410. ExactStaleness(11 * time.Second),
  411. func(ts time.Time) error {
  412. if ts.After(writes[0].ts) {
  413. return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
  414. }
  415. return nil
  416. },
  417. },
  418. } {
  419. // ReadOnlyTransaction.Query
  420. ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
  421. got, err := readAll(ro.Query(
  422. ctx,
  423. Statement{
  424. "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
  425. map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
  426. }))
  427. if err != nil {
  428. t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
  429. }
  430. if !testEqual(got, test.want) {
  431. t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
  432. }
  433. rts, err := ro.Timestamp()
  434. if err != nil {
  435. t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
  436. }
  437. if err := test.checkTs(rts); err != nil {
  438. t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
  439. }
  440. roTs := rts
  441. // ReadOnlyTransaction.Read
  442. got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
  443. if err != nil {
  444. t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
  445. }
  446. if !testEqual(got, test.want) {
  447. t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
  448. }
  449. rts, err = ro.Timestamp()
  450. if err != nil {
  451. t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
  452. }
  453. if err := test.checkTs(rts); err != nil {
  454. t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
  455. }
  456. if roTs != rts {
  457. t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
  458. }
  459. // ReadOnlyTransaction.ReadRow
  460. got = nil
  461. for _, k := range []Key{{1}, {3}, {4}} {
  462. r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
  463. if err != nil {
  464. continue
  465. }
  466. v, err := rowToValues(r)
  467. if err != nil {
  468. continue
  469. }
  470. got = append(got, v)
  471. rts, err = ro.Timestamp()
  472. if err != nil {
  473. t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
  474. }
  475. if err := test.checkTs(rts); err != nil {
  476. t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
  477. }
  478. if roTs != rts {
  479. t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
  480. }
  481. }
  482. if !testEqual(got, test.want) {
  483. t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
  484. }
  485. // SingleUse.ReadUsingIndex
  486. got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
  487. if err != nil {
  488. t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
  489. }
  490. // The results from ReadUsingIndex is sorted by the index rather than primary key.
  491. if len(got) != len(test.want) {
  492. t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
  493. }
  494. for j, g := range got {
  495. if j > 0 {
  496. prev := got[j-1][1].(string) + got[j-1][2].(string)
  497. curr := got[j][1].(string) + got[j][2].(string)
  498. if strings.Compare(prev, curr) > 0 {
  499. t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
  500. }
  501. }
  502. found := false
  503. for _, w := range test.want {
  504. if testEqual(g, w) {
  505. found = true
  506. }
  507. }
  508. if !found {
  509. t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
  510. break
  511. }
  512. }
  513. rts, err = ro.Timestamp()
  514. if err != nil {
  515. t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
  516. }
  517. if err := test.checkTs(rts); err != nil {
  518. t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
  519. }
  520. if roTs != rts {
  521. t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
  522. }
  523. ro.Close()
  524. }
  525. }
  526. // Test ReadOnlyTransaction with different timestamp bound when there's an update at the same time.
  527. func TestIntegration_UpdateDuringRead(t *testing.T) {
  528. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  529. defer cancel()
  530. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  531. defer cleanup()
  532. for i, tb := range []TimestampBound{
  533. StrongRead(),
  534. ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
  535. ExactStaleness(time.Minute * 30),
  536. } {
  537. ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
  538. _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
  539. if ErrCode(err) != codes.NotFound {
  540. t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
  541. }
  542. m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
  543. if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
  544. t.Fatal(err)
  545. }
  546. _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
  547. if ErrCode(err) != codes.NotFound {
  548. t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
  549. }
  550. }
  551. }
  552. // Test ReadWriteTransaction.
  553. func TestIntegration_ReadWriteTransaction(t *testing.T) {
  554. // Give a longer deadline because of transaction backoffs.
  555. ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  556. defer cancel()
  557. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  558. defer cleanup()
  559. // Set up two accounts
  560. accounts := []*Mutation{
  561. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
  562. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
  563. }
  564. if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
  565. t.Fatal(err)
  566. }
  567. wg := sync.WaitGroup{}
  568. readBalance := func(iter *RowIterator) (int64, error) {
  569. defer iter.Stop()
  570. var bal int64
  571. for {
  572. row, err := iter.Next()
  573. if err == iterator.Done {
  574. return bal, nil
  575. }
  576. if err != nil {
  577. return 0, err
  578. }
  579. if err := row.Column(0, &bal); err != nil {
  580. return 0, err
  581. }
  582. }
  583. }
  584. for i := 0; i < 20; i++ {
  585. wg.Add(1)
  586. go func(iter int) {
  587. defer wg.Done()
  588. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  589. // Query Foo's balance and Bar's balance.
  590. bf, e := readBalance(tx.Query(ctx,
  591. Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
  592. if e != nil {
  593. return e
  594. }
  595. bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
  596. if e != nil {
  597. return e
  598. }
  599. if bf <= 0 {
  600. return nil
  601. }
  602. bf--
  603. bb++
  604. return tx.BufferWrite([]*Mutation{
  605. Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
  606. Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
  607. })
  608. })
  609. if err != nil {
  610. t.Errorf("%d: failed to execute transaction: %v", iter, err)
  611. }
  612. }(i)
  613. }
  614. // Because of context timeout, all goroutines will eventually return.
  615. wg.Wait()
  616. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  617. var bf, bb int64
  618. r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
  619. if e != nil {
  620. return e
  621. }
  622. if ce := r.Column(0, &bf); ce != nil {
  623. return ce
  624. }
  625. bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
  626. if e != nil {
  627. return e
  628. }
  629. if bf != 30 || bb != 21 {
  630. t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
  631. }
  632. return nil
  633. })
  634. if err != nil {
  635. t.Errorf("failed to check balances: %v", err)
  636. }
  637. }
  638. func TestIntegration_Reads(t *testing.T) {
  639. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  640. defer cancel()
  641. // Set up testing environment.
  642. client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
  643. defer cleanup()
  644. // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
  645. var ms []*Mutation
  646. for i := 0; i < 15; i++ {
  647. ms = append(ms, InsertOrUpdate(testTable,
  648. testTableColumns,
  649. []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
  650. }
  651. // Don't use ApplyAtLeastOnce, so we can test the other code path.
  652. if _, err := client.Apply(ctx, ms); err != nil {
  653. t.Fatal(err)
  654. }
  655. // Empty read.
  656. rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
  657. KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
  658. if err != nil {
  659. t.Fatal(err)
  660. }
  661. if got, want := len(rows), 0; got != want {
  662. t.Errorf("got %d, want %d", got, want)
  663. }
  664. // Index empty read.
  665. rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
  666. KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. if got, want := len(rows), 0; got != want {
  671. t.Errorf("got %d, want %d", got, want)
  672. }
  673. // Point read.
  674. row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
  675. if err != nil {
  676. t.Fatal(err)
  677. }
  678. var got testTableRow
  679. if err := row.ToStruct(&got); err != nil {
  680. t.Fatal(err)
  681. }
  682. if want := (testTableRow{"k1", "v1"}); got != want {
  683. t.Errorf("got %v, want %v", got, want)
  684. }
  685. // Point read not found.
  686. _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
  687. if ErrCode(err) != codes.NotFound {
  688. t.Fatalf("got %v, want NotFound", err)
  689. }
  690. // No index point read not found, because Go does not have ReadRowUsingIndex.
  691. rangeReads(ctx, t, client)
  692. indexRangeReads(ctx, t, client)
  693. }
  694. func TestIntegration_EarlyTimestamp(t *testing.T) {
  695. // Test that we can get the timestamp from a read-only transaction as
  696. // soon as we have read at least one row.
  697. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  698. defer cancel()
  699. // Set up testing environment.
  700. client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
  701. defer cleanup()
  702. var ms []*Mutation
  703. for i := 0; i < 3; i++ {
  704. ms = append(ms, InsertOrUpdate(testTable,
  705. testTableColumns,
  706. []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
  707. }
  708. if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
  709. t.Fatal(err)
  710. }
  711. txn := client.Single()
  712. iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
  713. defer iter.Stop()
  714. // In single-use transaction, we should get an error before reading anything.
  715. if _, err := txn.Timestamp(); err == nil {
  716. t.Error("wanted error, got nil")
  717. }
  718. // After reading one row, the timestamp should be available.
  719. _, err := iter.Next()
  720. if err != nil {
  721. t.Fatal(err)
  722. }
  723. if _, err := txn.Timestamp(); err != nil {
  724. t.Errorf("got %v, want nil", err)
  725. }
  726. txn = client.ReadOnlyTransaction()
  727. defer txn.Close()
  728. iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
  729. defer iter.Stop()
  730. // In an ordinary read-only transaction, the timestamp should be
  731. // available immediately.
  732. if _, err := txn.Timestamp(); err != nil {
  733. t.Errorf("got %v, want nil", err)
  734. }
  735. }
  736. func TestIntegration_NestedTransaction(t *testing.T) {
  737. // You cannot use a transaction from inside a read-write transaction.
  738. ctx := context.Background()
  739. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  740. defer cleanup()
  741. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  742. _, err := client.ReadWriteTransaction(ctx,
  743. func(context.Context, *ReadWriteTransaction) error { return nil })
  744. if ErrCode(err) != codes.FailedPrecondition {
  745. t.Fatalf("got %v, want FailedPrecondition", err)
  746. }
  747. _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
  748. if ErrCode(err) != codes.FailedPrecondition {
  749. t.Fatalf("got %v, want FailedPrecondition", err)
  750. }
  751. rot := client.ReadOnlyTransaction()
  752. defer rot.Close()
  753. _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
  754. if ErrCode(err) != codes.FailedPrecondition {
  755. t.Fatalf("got %v, want FailedPrecondition", err)
  756. }
  757. return nil
  758. })
  759. if err != nil {
  760. t.Fatal(err)
  761. }
  762. }
  763. // Test client recovery on database recreation.
  764. func TestIntegration_DbRemovalRecovery(t *testing.T) {
  765. ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  766. defer cancel()
  767. client, dbPath, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  768. defer cleanup()
  769. // Drop the testing database.
  770. if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
  771. t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
  772. }
  773. // Now, send the query.
  774. iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
  775. defer iter.Stop()
  776. if _, err := iter.Next(); err == nil {
  777. t.Errorf("client sends query to removed database successfully, want it to fail")
  778. }
  779. // Recreate database and table.
  780. dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
  781. op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
  782. Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
  783. CreateStatement: "CREATE DATABASE " + dbName,
  784. ExtraStatements: []string{
  785. `CREATE TABLE Singers (
  786. SingerId INT64 NOT NULL,
  787. FirstName STRING(1024),
  788. LastName STRING(1024),
  789. SingerInfo BYTES(MAX)
  790. ) PRIMARY KEY (SingerId)`,
  791. },
  792. })
  793. if err != nil {
  794. t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
  795. }
  796. if _, err := op.Wait(ctx); err != nil {
  797. t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
  798. }
  799. // Now, send the query again.
  800. iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
  801. defer iter.Stop()
  802. _, err = iter.Next()
  803. if err != nil && err != iterator.Done {
  804. t.Errorf("failed to send query to database %v: %v", dbPath, err)
  805. }
  806. }
  807. // Test encoding/decoding non-struct Cloud Spanner types.
  808. func TestIntegration_BasicTypes(t *testing.T) {
  809. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  810. defer cancel()
  811. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  812. defer cleanup()
  813. t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
  814. // Boundaries
  815. t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
  816. t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
  817. d1, _ := civil.ParseDate("2016-11-15")
  818. // Boundaries
  819. d2, _ := civil.ParseDate("0001-01-01")
  820. d3, _ := civil.ParseDate("9999-12-31")
  821. tests := []struct {
  822. col string
  823. val interface{}
  824. want interface{}
  825. }{
  826. {col: "String", val: ""},
  827. {col: "String", val: "", want: NullString{"", true}},
  828. {col: "String", val: "foo"},
  829. {col: "String", val: "foo", want: NullString{"foo", true}},
  830. {col: "String", val: NullString{"bar", true}, want: "bar"},
  831. {col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
  832. {col: "String", val: nil, want: NullString{}},
  833. {col: "StringArray", val: []string(nil), want: []NullString(nil)},
  834. {col: "StringArray", val: []string{}, want: []NullString{}},
  835. {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
  836. {col: "StringArray", val: []NullString(nil)},
  837. {col: "StringArray", val: []NullString{}},
  838. {col: "StringArray", val: []NullString{{"foo", true}, {}}},
  839. {col: "Bytes", val: []byte{}},
  840. {col: "Bytes", val: []byte{1, 2, 3}},
  841. {col: "Bytes", val: []byte(nil)},
  842. {col: "BytesArray", val: [][]byte(nil)},
  843. {col: "BytesArray", val: [][]byte{}},
  844. {col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
  845. {col: "Int64a", val: 0, want: int64(0)},
  846. {col: "Int64a", val: -1, want: int64(-1)},
  847. {col: "Int64a", val: 2, want: int64(2)},
  848. {col: "Int64a", val: int64(3)},
  849. {col: "Int64a", val: 4, want: NullInt64{4, true}},
  850. {col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
  851. {col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
  852. {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
  853. {col: "Int64a", val: nil, want: NullInt64{}},
  854. {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
  855. {col: "Int64Array", val: []int{}, want: []NullInt64{}},
  856. {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
  857. {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
  858. {col: "Int64Array", val: []int64{}, want: []NullInt64{}},
  859. {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
  860. {col: "Int64Array", val: []NullInt64(nil)},
  861. {col: "Int64Array", val: []NullInt64{}},
  862. {col: "Int64Array", val: []NullInt64{{1, true}, {}}},
  863. {col: "Bool", val: false},
  864. {col: "Bool", val: true},
  865. {col: "Bool", val: false, want: NullBool{false, true}},
  866. {col: "Bool", val: true, want: NullBool{true, true}},
  867. {col: "Bool", val: NullBool{true, true}},
  868. {col: "Bool", val: NullBool{false, false}},
  869. {col: "Bool", val: nil, want: NullBool{}},
  870. {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
  871. {col: "BoolArray", val: []bool{}, want: []NullBool{}},
  872. {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
  873. {col: "BoolArray", val: []NullBool(nil)},
  874. {col: "BoolArray", val: []NullBool{}},
  875. {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
  876. {col: "Float64", val: 0.0},
  877. {col: "Float64", val: 3.14},
  878. {col: "Float64", val: math.NaN()},
  879. {col: "Float64", val: math.Inf(1)},
  880. {col: "Float64", val: math.Inf(-1)},
  881. {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
  882. {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
  883. {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
  884. {col: "Float64", val: NullFloat64{0, false}},
  885. {col: "Float64", val: nil, want: NullFloat64{}},
  886. {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
  887. {col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
  888. {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
  889. {col: "Float64Array", val: []NullFloat64(nil)},
  890. {col: "Float64Array", val: []NullFloat64{}},
  891. {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
  892. {col: "Date", val: d1},
  893. {col: "Date", val: d1, want: NullDate{d1, true}},
  894. {col: "Date", val: NullDate{d1, true}},
  895. {col: "Date", val: NullDate{d1, true}, want: d1},
  896. {col: "Date", val: NullDate{civil.Date{}, false}},
  897. {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
  898. {col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
  899. {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
  900. {col: "Timestamp", val: t1},
  901. {col: "Timestamp", val: t1, want: NullTime{t1, true}},
  902. {col: "Timestamp", val: NullTime{t1, true}},
  903. {col: "Timestamp", val: NullTime{t1, true}, want: t1},
  904. {col: "Timestamp", val: NullTime{}},
  905. {col: "Timestamp", val: nil, want: NullTime{}},
  906. {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
  907. {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
  908. {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
  909. }
  910. // Write rows into table first.
  911. var muts []*Mutation
  912. for i, test := range tests {
  913. muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
  914. }
  915. if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
  916. t.Fatal(err)
  917. }
  918. for i, test := range tests {
  919. row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
  920. if err != nil {
  921. t.Fatalf("Unable to fetch row %v: %v", i, err)
  922. }
  923. // Create new instance of type of test.want.
  924. want := test.want
  925. if want == nil {
  926. want = test.val
  927. }
  928. gotp := reflect.New(reflect.TypeOf(want))
  929. if err := row.Column(0, gotp.Interface()); err != nil {
  930. t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
  931. continue
  932. }
  933. got := reflect.Indirect(gotp).Interface()
  934. // One of the test cases is checking NaN handling. Given
  935. // NaN!=NaN, we can't use reflect to test for it.
  936. if isNaN(got) && isNaN(want) {
  937. continue
  938. }
  939. // Check non-NaN cases.
  940. if !testEqual(got, want) {
  941. t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
  942. continue
  943. }
  944. }
  945. }
  946. // Test decoding Cloud Spanner STRUCT type.
  947. func TestIntegration_StructTypes(t *testing.T) {
  948. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  949. defer cancel()
  950. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  951. defer cleanup()
  952. tests := []struct {
  953. q Statement
  954. want func(r *Row) error
  955. }{
  956. {
  957. q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
  958. want: func(r *Row) error {
  959. // Test STRUCT ARRAY decoding to []NullRow.
  960. var rows []NullRow
  961. if err := r.Column(0, &rows); err != nil {
  962. return err
  963. }
  964. if len(rows) != 1 {
  965. return fmt.Errorf("len(rows) = %d; want 1", len(rows))
  966. }
  967. if !rows[0].Valid {
  968. return fmt.Errorf("rows[0] is NULL")
  969. }
  970. var i, j int64
  971. if err := rows[0].Row.Columns(&i, &j); err != nil {
  972. return err
  973. }
  974. if i != 1 || j != 2 {
  975. return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
  976. }
  977. return nil
  978. },
  979. },
  980. {
  981. q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
  982. want: func(r *Row) error {
  983. // Test Row.ToStruct.
  984. s := struct {
  985. Col1 []*struct {
  986. Foo int64 `spanner:"foo"`
  987. Bar int64 `spanner:"bar"`
  988. } `spanner:"col1"`
  989. }{}
  990. if err := r.ToStruct(&s); err != nil {
  991. return err
  992. }
  993. want := struct {
  994. Col1 []*struct {
  995. Foo int64 `spanner:"foo"`
  996. Bar int64 `spanner:"bar"`
  997. } `spanner:"col1"`
  998. }{
  999. Col1: []*struct {
  1000. Foo int64 `spanner:"foo"`
  1001. Bar int64 `spanner:"bar"`
  1002. }{
  1003. {
  1004. Foo: 1,
  1005. Bar: 2,
  1006. },
  1007. },
  1008. }
  1009. if !testEqual(want, s) {
  1010. return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
  1011. }
  1012. return nil
  1013. },
  1014. },
  1015. }
  1016. for i, test := range tests {
  1017. iter := client.Single().Query(ctx, test.q)
  1018. defer iter.Stop()
  1019. row, err := iter.Next()
  1020. if err != nil {
  1021. t.Errorf("%d: %v", i, err)
  1022. continue
  1023. }
  1024. if err := test.want(row); err != nil {
  1025. t.Errorf("%d: %v", i, err)
  1026. continue
  1027. }
  1028. }
  1029. }
  1030. func TestIntegration_StructParametersUnsupported(t *testing.T) {
  1031. ctx := context.Background()
  1032. client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
  1033. defer cleanup()
  1034. for _, test := range []struct {
  1035. param interface{}
  1036. wantCode codes.Code
  1037. wantMsgPart string
  1038. }{
  1039. {
  1040. struct {
  1041. Field int
  1042. }{10},
  1043. codes.Unimplemented,
  1044. "Unsupported query shape: " +
  1045. "A struct value cannot be returned as a column value. " +
  1046. "Rewrite the query to flatten the struct fields in the result.",
  1047. },
  1048. {
  1049. []struct {
  1050. Field int
  1051. }{{10}, {20}},
  1052. codes.Unimplemented,
  1053. "Unsupported query shape: " +
  1054. "This query can return a null-valued array of struct, " +
  1055. "which is not supported by Spanner.",
  1056. },
  1057. } {
  1058. iter := client.Single().Query(ctx, Statement{
  1059. SQL: "SELECT @p",
  1060. Params: map[string]interface{}{"p": test.param},
  1061. })
  1062. _, err := iter.Next()
  1063. iter.Stop()
  1064. if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
  1065. t.Fatal(msg)
  1066. }
  1067. }
  1068. }
  1069. // Test queries of the form "SELECT expr".
  1070. func TestIntegration_QueryExpressions(t *testing.T) {
  1071. ctx := context.Background()
  1072. client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
  1073. defer cleanup()
  1074. newRow := func(vals []interface{}) *Row {
  1075. row, err := NewRow(make([]string, len(vals)), vals)
  1076. if err != nil {
  1077. t.Fatal(err)
  1078. }
  1079. return row
  1080. }
  1081. tests := []struct {
  1082. expr string
  1083. want interface{}
  1084. }{
  1085. {"1", int64(1)},
  1086. {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
  1087. {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
  1088. {"IEEE_DIVIDE(1, 0)", math.Inf(1)},
  1089. {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
  1090. {"IEEE_DIVIDE(0, 0)", math.NaN()},
  1091. // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
  1092. {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
  1093. {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
  1094. {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
  1095. }
  1096. for _, test := range tests {
  1097. iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
  1098. defer iter.Stop()
  1099. row, err := iter.Next()
  1100. if err != nil {
  1101. t.Errorf("%q: %v", test.expr, err)
  1102. continue
  1103. }
  1104. // Create new instance of type of test.want.
  1105. gotp := reflect.New(reflect.TypeOf(test.want))
  1106. if err := row.Column(0, gotp.Interface()); err != nil {
  1107. t.Errorf("%q: Column returned error %v", test.expr, err)
  1108. continue
  1109. }
  1110. got := reflect.Indirect(gotp).Interface()
  1111. // TODO(jba): remove isNaN special case when we have a better equality predicate.
  1112. if isNaN(got) && isNaN(test.want) {
  1113. continue
  1114. }
  1115. if !testEqual(got, test.want) {
  1116. t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want)
  1117. }
  1118. }
  1119. }
  1120. func TestIntegration_QueryStats(t *testing.T) {
  1121. ctx := context.Background()
  1122. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  1123. defer cleanup()
  1124. accounts := []*Mutation{
  1125. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
  1126. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
  1127. }
  1128. if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
  1129. t.Fatal(err)
  1130. }
  1131. const sql = "SELECT Balance FROM Accounts"
  1132. qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
  1133. if err != nil {
  1134. t.Fatal(err)
  1135. }
  1136. if len(qp.PlanNodes) == 0 {
  1137. t.Error("got zero plan nodes, expected at least one")
  1138. }
  1139. iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
  1140. defer iter.Stop()
  1141. for {
  1142. _, err := iter.Next()
  1143. if err == iterator.Done {
  1144. break
  1145. }
  1146. if err != nil {
  1147. t.Fatal(err)
  1148. }
  1149. }
  1150. if iter.QueryPlan == nil {
  1151. t.Error("got nil QueryPlan, expected one")
  1152. }
  1153. if iter.QueryStats == nil {
  1154. t.Error("got nil QueryStats, expected some")
  1155. }
  1156. }
  1157. func TestIntegration_InvalidDatabase(t *testing.T) {
  1158. if testProjectID == "" {
  1159. t.Skip("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
  1160. }
  1161. ctx := context.Background()
  1162. dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
  1163. c, err := createClient(ctx, dbPath)
  1164. // Client creation should succeed even if the database is invalid.
  1165. if err != nil {
  1166. t.Fatal(err)
  1167. }
  1168. _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
  1169. if msg, ok := matchError(err, codes.NotFound, ""); !ok {
  1170. t.Fatal(msg)
  1171. }
  1172. }
  1173. func TestIntegration_ReadErrors(t *testing.T) {
  1174. ctx := context.Background()
  1175. client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
  1176. defer cleanup()
  1177. // Read over invalid table fails
  1178. _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
  1179. if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
  1180. t.Error(msg)
  1181. }
  1182. // Read over invalid column fails
  1183. _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
  1184. if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
  1185. t.Error(msg)
  1186. }
  1187. // Invalid query fails
  1188. iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
  1189. defer iter.Stop()
  1190. _, err = iter.Next()
  1191. if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
  1192. t.Error(msg)
  1193. }
  1194. // Read should fail on cancellation.
  1195. cctx, cancel := context.WithCancel(ctx)
  1196. cancel()
  1197. _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
  1198. if msg, ok := matchError(err, codes.Canceled, ""); !ok {
  1199. t.Error(msg)
  1200. }
  1201. // Read should fail if deadline exceeded.
  1202. dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
  1203. defer cancel()
  1204. <-dctx.Done()
  1205. _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
  1206. if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
  1207. t.Error(msg)
  1208. }
  1209. }
  1210. // Test TransactionRunner. Test that transactions are aborted and retried as expected.
  1211. func TestIntegration_TransactionRunner(t *testing.T) {
  1212. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1213. defer cancel()
  1214. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  1215. defer cleanup()
  1216. // Test 1: User error should abort the transaction.
  1217. _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1218. tx.BufferWrite([]*Mutation{
  1219. Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
  1220. return errors.New("user error")
  1221. })
  1222. // Empty read.
  1223. rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
  1224. if err != nil {
  1225. t.Fatal(err)
  1226. }
  1227. if got, want := len(rows), 0; got != want {
  1228. t.Errorf("Empty read, got %d, want %d.", got, want)
  1229. }
  1230. // Test 2: Expect abort and retry.
  1231. // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by committing writes to the column txn2 have read,
  1232. // and expect the following read to abort and txn2 retries.
  1233. // Set up two accounts
  1234. accounts := []*Mutation{
  1235. Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
  1236. Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
  1237. }
  1238. if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
  1239. t.Fatal(err)
  1240. }
  1241. var (
  1242. cTxn1Start = make(chan struct{})
  1243. cTxn1Commit = make(chan struct{})
  1244. cTxn2Start = make(chan struct{})
  1245. wg sync.WaitGroup
  1246. )
  1247. // read balance, check error if we don't expect abort.
  1248. readBalance := func(tx interface {
  1249. ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
  1250. }, key int64, expectAbort bool) (int64, error) {
  1251. var b int64
  1252. r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
  1253. if e != nil {
  1254. if expectAbort && !isAbortErr(e) {
  1255. t.Errorf("ReadRow got %v, want Abort error.", e)
  1256. }
  1257. return b, e
  1258. }
  1259. if ce := r.Column(0, &b); ce != nil {
  1260. return b, ce
  1261. }
  1262. return b, nil
  1263. }
  1264. wg.Add(2)
  1265. // Txn 1
  1266. go func() {
  1267. defer wg.Done()
  1268. var once sync.Once
  1269. _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1270. b, e := readBalance(tx, 1, false)
  1271. if e != nil {
  1272. return e
  1273. }
  1274. // txn 1 can abort, in that case we skip closing the channel on retry.
  1275. once.Do(func() { close(cTxn1Start) })
  1276. e = tx.BufferWrite([]*Mutation{
  1277. Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
  1278. if e != nil {
  1279. return e
  1280. }
  1281. // Wait for second transaction.
  1282. <-cTxn2Start
  1283. return nil
  1284. })
  1285. close(cTxn1Commit)
  1286. if e != nil {
  1287. t.Errorf("Transaction 1 commit, got %v, want nil.", e)
  1288. }
  1289. }()
  1290. // Txn 2
  1291. go func() {
  1292. // Wait until txn 1 starts.
  1293. <-cTxn1Start
  1294. defer wg.Done()
  1295. var (
  1296. once sync.Once
  1297. b1 int64
  1298. b2 int64
  1299. e error
  1300. )
  1301. _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1302. if b1, e = readBalance(tx, 1, false); e != nil {
  1303. return e
  1304. }
  1305. // Skip closing channel on retry.
  1306. once.Do(func() { close(cTxn2Start) })
  1307. // Wait until txn 1 successfully commits.
  1308. <-cTxn1Commit
  1309. // Txn1 has committed and written a balance to the account.
  1310. // Now this transaction (txn2) reads and re-writes the balance.
  1311. // The first time through, it will abort because it overlaps with txn1.
  1312. // Then it will retry after txn1 commits, and succeed.
  1313. if b2, e = readBalance(tx, 2, true); e != nil {
  1314. return e
  1315. }
  1316. return tx.BufferWrite([]*Mutation{
  1317. Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
  1318. })
  1319. if e != nil {
  1320. t.Errorf("Transaction 2 commit, got %v, want nil.", e)
  1321. }
  1322. }()
  1323. wg.Wait()
  1324. // Check that both transactions' effects are visible.
  1325. for i := int64(1); i <= int64(2); i++ {
  1326. if b, e := readBalance(client.Single(), i, false); e != nil {
  1327. t.Fatalf("ReadBalance for key %d error %v.", i, e)
  1328. } else if b != i {
  1329. t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
  1330. }
  1331. }
  1332. }
  1333. // Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
  1334. // serialize and deserialize both transaction and partition to be used in
  1335. // execution on another client, and compare results.
  1336. func TestIntegration_BatchQuery(t *testing.T) {
  1337. // Set up testing environment.
  1338. var (
  1339. client2 *Client
  1340. err error
  1341. )
  1342. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1343. defer cancel()
  1344. client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
  1345. defer cleanup()
  1346. if err = populate(ctx, client); err != nil {
  1347. t.Fatal(err)
  1348. }
  1349. if client2, err = createClient(ctx, dbPath); err != nil {
  1350. t.Fatal(err)
  1351. }
  1352. defer client2.Close()
  1353. // PartitionQuery
  1354. var (
  1355. txn *BatchReadOnlyTransaction
  1356. partitions []*Partition
  1357. stmt = Statement{SQL: "SELECT * FROM test;"}
  1358. )
  1359. if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
  1360. t.Fatal(err)
  1361. }
  1362. defer txn.Cleanup(ctx)
  1363. if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
  1364. t.Fatal(err)
  1365. }
  1366. // Reconstruct BatchReadOnlyTransactionID and execute partitions
  1367. var (
  1368. tid2 BatchReadOnlyTransactionID
  1369. data []byte
  1370. gotResult bool // if we get matching result from two separate txns
  1371. )
  1372. if data, err = txn.ID.MarshalBinary(); err != nil {
  1373. t.Fatalf("encoding failed %v", err)
  1374. }
  1375. if err = tid2.UnmarshalBinary(data); err != nil {
  1376. t.Fatalf("decoding failed %v", err)
  1377. }
  1378. txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
  1379. // Execute Partitions and compare results
  1380. for i, p := range partitions {
  1381. iter := txn.Execute(ctx, p)
  1382. defer iter.Stop()
  1383. p2 := serdesPartition(t, i, p)
  1384. iter2 := txn2.Execute(ctx, &p2)
  1385. defer iter2.Stop()
  1386. row1, err1 := iter.Next()
  1387. row2, err2 := iter2.Next()
  1388. if err1 != err2 {
  1389. t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
  1390. continue
  1391. }
  1392. if !testEqual(row1, row2) {
  1393. t.Fatalf("execution returned different values: %v, %v", row1, row2)
  1394. continue
  1395. }
  1396. if row1 == nil {
  1397. continue
  1398. }
  1399. var a, b string
  1400. if err = row1.Columns(&a, &b); err != nil {
  1401. t.Fatalf("failed to parse row %v", err)
  1402. continue
  1403. }
  1404. if a == str1 && b == str2 {
  1405. gotResult = true
  1406. }
  1407. }
  1408. if !gotResult {
  1409. t.Fatalf("execution didn't return expected values")
  1410. }
  1411. }
  1412. // Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
  1413. func TestIntegration_BatchRead(t *testing.T) {
  1414. // Set up testing environment.
  1415. var (
  1416. client2 *Client
  1417. err error
  1418. )
  1419. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1420. defer cancel()
  1421. client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
  1422. defer cleanup()
  1423. if err = populate(ctx, client); err != nil {
  1424. t.Fatal(err)
  1425. }
  1426. if client2, err = createClient(ctx, dbPath); err != nil {
  1427. t.Fatal(err)
  1428. }
  1429. defer client2.Close()
  1430. // PartitionRead
  1431. var (
  1432. txn *BatchReadOnlyTransaction
  1433. partitions []*Partition
  1434. )
  1435. if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
  1436. t.Fatal(err)
  1437. }
  1438. defer txn.Cleanup(ctx)
  1439. if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
  1440. t.Fatal(err)
  1441. }
  1442. // Reconstruct BatchReadOnlyTransactionID and execute partitions
  1443. var (
  1444. tid2 BatchReadOnlyTransactionID
  1445. data []byte
  1446. gotResult bool // if we get matching result from two separate txns
  1447. )
  1448. if data, err = txn.ID.MarshalBinary(); err != nil {
  1449. t.Fatalf("encoding failed %v", err)
  1450. }
  1451. if err = tid2.UnmarshalBinary(data); err != nil {
  1452. t.Fatalf("decoding failed %v", err)
  1453. }
  1454. txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
  1455. // Execute Partitions and compare results
  1456. for i, p := range partitions {
  1457. iter := txn.Execute(ctx, p)
  1458. defer iter.Stop()
  1459. p2 := serdesPartition(t, i, p)
  1460. iter2 := txn2.Execute(ctx, &p2)
  1461. defer iter2.Stop()
  1462. row1, err1 := iter.Next()
  1463. row2, err2 := iter2.Next()
  1464. if err1 != err2 {
  1465. t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
  1466. continue
  1467. }
  1468. if !testEqual(row1, row2) {
  1469. t.Fatalf("execution returned different values: %v, %v", row1, row2)
  1470. continue
  1471. }
  1472. if row1 == nil {
  1473. continue
  1474. }
  1475. var a, b string
  1476. if err = row1.Columns(&a, &b); err != nil {
  1477. t.Fatalf("failed to parse row %v", err)
  1478. continue
  1479. }
  1480. if a == str1 && b == str2 {
  1481. gotResult = true
  1482. }
  1483. }
  1484. if !gotResult {
  1485. t.Fatalf("execution didn't return expected values")
  1486. }
  1487. }
  1488. // Test normal txReadEnv method on BatchReadOnlyTransaction.
  1489. func TestIntegration_BROTNormal(t *testing.T) {
  1490. // Set up testing environment and create txn.
  1491. var (
  1492. txn *BatchReadOnlyTransaction
  1493. err error
  1494. row *Row
  1495. i int64
  1496. )
  1497. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1498. defer cancel()
  1499. client, _, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
  1500. defer cleanup()
  1501. if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
  1502. t.Fatal(err)
  1503. }
  1504. defer txn.Cleanup(ctx)
  1505. if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
  1506. t.Fatal(err)
  1507. }
  1508. // Normal query should work with BatchReadOnlyTransaction
  1509. stmt2 := Statement{SQL: "SELECT 1"}
  1510. iter := txn.Query(ctx, stmt2)
  1511. defer iter.Stop()
  1512. row, err = iter.Next()
  1513. if err != nil {
  1514. t.Errorf("query failed with %v", err)
  1515. }
  1516. if err = row.Columns(&i); err != nil {
  1517. t.Errorf("failed to parse row %v", err)
  1518. }
  1519. }
  1520. func TestIntegration_CommitTimestamp(t *testing.T) {
  1521. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1522. defer cancel()
  1523. client, _, cleanup := prepareIntegrationTest(ctx, t, ctsDBStatements)
  1524. defer cleanup()
  1525. type testTableRow struct {
  1526. Key string
  1527. Ts NullTime
  1528. }
  1529. var (
  1530. cts1, cts2, ts1, ts2 time.Time
  1531. err error
  1532. )
  1533. // Apply mutation in sequence, expect to see commit timestamp in good order, check also the commit timestamp returned
  1534. for _, it := range []struct {
  1535. k string
  1536. t *time.Time
  1537. }{
  1538. {"a", &cts1},
  1539. {"b", &cts2},
  1540. } {
  1541. tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
  1542. m, err := InsertStruct("TestTable", tt)
  1543. if err != nil {
  1544. t.Fatal(err)
  1545. }
  1546. *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
  1547. if err != nil {
  1548. t.Fatal(err)
  1549. }
  1550. }
  1551. txn := client.ReadOnlyTransaction()
  1552. for _, it := range []struct {
  1553. k string
  1554. t *time.Time
  1555. }{
  1556. {"a", &ts1},
  1557. {"b", &ts2},
  1558. } {
  1559. if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
  1560. t.Fatal(err)
  1561. } else {
  1562. var got testTableRow
  1563. if err := r.ToStruct(&got); err != nil {
  1564. t.Fatal(err)
  1565. }
  1566. *it.t = got.Ts.Time
  1567. }
  1568. }
  1569. if !cts1.Equal(ts1) {
  1570. t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
  1571. }
  1572. if !cts2.Equal(ts2) {
  1573. t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
  1574. }
  1575. // Try writing a timestamp in the future to commit timestamp, expect error
  1576. _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
  1577. if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
  1578. t.Error(msg)
  1579. }
  1580. }
  1581. func TestIntegration_DML(t *testing.T) {
  1582. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1583. defer cancel()
  1584. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  1585. defer cleanup()
  1586. // Function that reads a single row's first name from within a transaction.
  1587. readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
  1588. row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
  1589. if err != nil {
  1590. return "", err
  1591. }
  1592. var fn string
  1593. if err := row.Column(0, &fn); err != nil {
  1594. return "", err
  1595. }
  1596. return fn, nil
  1597. }
  1598. // Function that reads multiple rows' first names from outside a read/write transaction.
  1599. readFirstNames := func(keys ...int) []string {
  1600. var ks []KeySet
  1601. for _, k := range keys {
  1602. ks = append(ks, Key{k})
  1603. }
  1604. iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
  1605. var got []string
  1606. var fn string
  1607. err := iter.Do(func(row *Row) error {
  1608. if err := row.Column(0, &fn); err != nil {
  1609. return err
  1610. }
  1611. got = append(got, fn)
  1612. return nil
  1613. })
  1614. if err != nil {
  1615. t.Fatalf("readFirstNames(%v): %v", keys, err)
  1616. }
  1617. return got
  1618. }
  1619. // Use ReadWriteTransaction.Query to execute a DML statement.
  1620. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1621. iter := tx.Query(ctx, Statement{
  1622. SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
  1623. })
  1624. defer iter.Stop()
  1625. if row, err := iter.Next(); err != iterator.Done {
  1626. t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
  1627. }
  1628. if iter.RowCount != 1 {
  1629. t.Errorf("row count: got %d, want 1", iter.RowCount)
  1630. }
  1631. // The results of the DML statement should be visible to the transaction.
  1632. got, err := readFirstName(tx, 1)
  1633. if err != nil {
  1634. return err
  1635. }
  1636. if want := "Umm"; got != want {
  1637. t.Errorf("got %q, want %q", got, want)
  1638. }
  1639. return nil
  1640. })
  1641. if err != nil {
  1642. t.Fatal(err)
  1643. }
  1644. // Use ReadWriteTransaction.Update to execute a DML statement.
  1645. _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1646. count, err := tx.Update(ctx, Statement{
  1647. SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
  1648. })
  1649. if err != nil {
  1650. t.Fatal(err)
  1651. }
  1652. if count != 1 {
  1653. t.Errorf("row count: got %d, want 1", count)
  1654. }
  1655. got, err := readFirstName(tx, 2)
  1656. if err != nil {
  1657. return err
  1658. }
  1659. if want := "Eduard"; got != want {
  1660. t.Errorf("got %q, want %q", got, want)
  1661. }
  1662. return nil
  1663. })
  1664. if err != nil {
  1665. t.Fatal(err)
  1666. }
  1667. // Roll back a DML statement and confirm that it didn't happen.
  1668. var fail = errors.New("fail")
  1669. _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1670. _, err := tx.Update(ctx, Statement{
  1671. SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
  1672. })
  1673. if err != nil {
  1674. return err
  1675. }
  1676. return fail
  1677. })
  1678. if err != fail {
  1679. t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
  1680. }
  1681. _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
  1682. if got, want := ErrCode(err), codes.NotFound; got != want {
  1683. t.Errorf("got %s, want %s", got, want)
  1684. }
  1685. // Run two DML statements in the same transaction.
  1686. _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1687. _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
  1688. if err != nil {
  1689. return err
  1690. }
  1691. _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
  1692. if err != nil {
  1693. return err
  1694. }
  1695. return nil
  1696. })
  1697. if err != nil {
  1698. t.Fatal(err)
  1699. }
  1700. got := readFirstNames(1, 2)
  1701. want := []string{"Oum", "Eddie"}
  1702. if !testEqual(got, want) {
  1703. t.Errorf("got %v, want %v", got, want)
  1704. }
  1705. // Run a DML statement and an ordinary mutation in the same transaction.
  1706. _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1707. _, err := tx.Update(ctx, Statement{
  1708. SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
  1709. })
  1710. if err != nil {
  1711. return err
  1712. }
  1713. tx.BufferWrite([]*Mutation{
  1714. Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
  1715. []interface{}{4, "Andy", "Irvine"}),
  1716. })
  1717. return nil
  1718. })
  1719. if err != nil {
  1720. t.Fatal(err)
  1721. }
  1722. got = readFirstNames(3, 4)
  1723. want = []string{"Audra", "Andy"}
  1724. if !testEqual(got, want) {
  1725. t.Errorf("got %v, want %v", got, want)
  1726. }
  1727. // Attempt to run a query using update.
  1728. _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
  1729. _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
  1730. return err
  1731. })
  1732. if got, want := ErrCode(err), codes.InvalidArgument; got != want {
  1733. t.Errorf("got %s, want %s", got, want)
  1734. }
  1735. }
  1736. func TestIntegration_StructParametersBind(t *testing.T) {
  1737. t.Parallel()
  1738. ctx := context.Background()
  1739. client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
  1740. defer cleanup()
  1741. type tRow []interface{}
  1742. type tRows []struct{ trow tRow }
  1743. type allFields struct {
  1744. Stringf string
  1745. Intf int
  1746. Boolf bool
  1747. Floatf float64
  1748. Bytef []byte
  1749. Timef time.Time
  1750. Datef civil.Date
  1751. }
  1752. allColumns := []string{
  1753. "Stringf",
  1754. "Intf",
  1755. "Boolf",
  1756. "Floatf",
  1757. "Bytef",
  1758. "Timef",
  1759. "Datef",
  1760. }
  1761. s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
  1762. s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
  1763. dynamicStructType := reflect.StructOf([]reflect.StructField{
  1764. {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
  1765. })
  1766. s3 := reflect.New(dynamicStructType)
  1767. s3.Elem().Field(0).Set(reflect.ValueOf(t1))
  1768. for i, test := range []struct {
  1769. param interface{}
  1770. sql string
  1771. cols []string
  1772. trows tRows
  1773. }{
  1774. // Struct value.
  1775. {
  1776. s1,
  1777. "SELECT" +
  1778. " @p.Stringf," +
  1779. " @p.Intf," +
  1780. " @p.Boolf," +
  1781. " @p.Floatf," +
  1782. " @p.Bytef," +
  1783. " @p.Timef," +
  1784. " @p.Datef",
  1785. allColumns,
  1786. tRows{
  1787. {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
  1788. },
  1789. },
  1790. // Array of struct value.
  1791. {
  1792. []allFields{s1, s2},
  1793. "SELECT * FROM UNNEST(@p)",
  1794. allColumns,
  1795. tRows{
  1796. {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
  1797. {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
  1798. },
  1799. },
  1800. // Null struct.
  1801. {
  1802. (*allFields)(nil),
  1803. "SELECT @p IS NULL",
  1804. []string{""},
  1805. tRows{
  1806. {tRow{true}},
  1807. },
  1808. },
  1809. // Null Array of struct.
  1810. {
  1811. []allFields(nil),
  1812. "SELECT @p IS NULL",
  1813. []string{""},
  1814. tRows{
  1815. {tRow{true}},
  1816. },
  1817. },
  1818. // Empty struct.
  1819. {
  1820. struct{}{},
  1821. "SELECT @p IS NULL ",
  1822. []string{""},
  1823. tRows{
  1824. {tRow{false}},
  1825. },
  1826. },
  1827. // Empty array of struct.
  1828. {
  1829. []allFields{},
  1830. "SELECT * FROM UNNEST(@p) ",
  1831. allColumns,
  1832. tRows{},
  1833. },
  1834. // Struct with duplicate fields.
  1835. {
  1836. struct {
  1837. A int `spanner:"field"`
  1838. B int `spanner:"field"`
  1839. }{10, 20},
  1840. "SELECT * FROM UNNEST([@p]) ",
  1841. []string{"field", "field"},
  1842. tRows{
  1843. {tRow{10, 20}},
  1844. },
  1845. },
  1846. // Struct with unnamed fields.
  1847. {
  1848. struct {
  1849. A string `spanner:""`
  1850. }{"hello"},
  1851. "SELECT * FROM UNNEST([@p]) ",
  1852. []string{""},
  1853. tRows{
  1854. {tRow{"hello"}},
  1855. },
  1856. },
  1857. // Mixed struct.
  1858. {
  1859. struct {
  1860. DynamicStructField interface{} `spanner:"f1"`
  1861. ArrayStructField []*allFields `spanner:"f2"`
  1862. }{
  1863. DynamicStructField: s3.Interface(),
  1864. ArrayStructField: []*allFields{nil},
  1865. },
  1866. "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
  1867. []string{"ff1", "", ""},
  1868. tRows{
  1869. {tRow{t1, 1, true}},
  1870. },
  1871. },
  1872. } {
  1873. iter := client.Single().Query(ctx, Statement{
  1874. SQL: test.sql,
  1875. Params: map[string]interface{}{"p": test.param},
  1876. })
  1877. var gotRows []*Row
  1878. err := iter.Do(func(r *Row) error {
  1879. gotRows = append(gotRows, r)
  1880. return nil
  1881. })
  1882. if err != nil {
  1883. t.Errorf("Failed to execute test case %d, error: %v", i, err)
  1884. }
  1885. var wantRows []*Row
  1886. for j, row := range test.trows {
  1887. r, err := NewRow(test.cols, row.trow)
  1888. if err != nil {
  1889. t.Errorf("Invalid row %d in test case %d", j, i)
  1890. }
  1891. wantRows = append(wantRows, r)
  1892. }
  1893. if !testEqual(gotRows, wantRows) {
  1894. t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
  1895. }
  1896. }
  1897. }
  1898. func TestIntegration_PDML(t *testing.T) {
  1899. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1900. defer cancel()
  1901. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  1902. defer cleanup()
  1903. columns := []string{"SingerId", "FirstName", "LastName"}
  1904. // Populate the Singers table.
  1905. var muts []*Mutation
  1906. for _, row := range [][]interface{}{
  1907. {1, "Umm", "Kulthum"},
  1908. {2, "Eduard", "Khil"},
  1909. {3, "Audra", "McDonald"},
  1910. } {
  1911. muts = append(muts, Insert("Singers", columns, row))
  1912. }
  1913. if _, err := client.Apply(ctx, muts); err != nil {
  1914. t.Fatal(err)
  1915. }
  1916. // Identifiers in PDML statements must be fully qualified.
  1917. // TODO(jba): revisit the above.
  1918. count, err := client.PartitionedUpdate(ctx, Statement{
  1919. SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= 3`,
  1920. })
  1921. if err != nil {
  1922. t.Fatal(err)
  1923. }
  1924. if want := int64(3); count != want {
  1925. t.Errorf("got %d, want %d", count, want)
  1926. }
  1927. got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
  1928. if err != nil {
  1929. t.Fatal(err)
  1930. }
  1931. want := [][]interface{}{
  1932. {int64(1), "changed", "Kulthum"},
  1933. {int64(2), "changed", "Khil"},
  1934. {int64(3), "changed", "McDonald"},
  1935. }
  1936. if !testEqual(got, want) {
  1937. t.Errorf("\ngot %v\nwant%v", got, want)
  1938. }
  1939. }
  1940. func TestBatchDML(t *testing.T) {
  1941. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1942. defer cancel()
  1943. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  1944. defer cleanup()
  1945. columns := []string{"SingerId", "FirstName", "LastName"}
  1946. // Populate the Singers table.
  1947. var muts []*Mutation
  1948. for _, row := range [][]interface{}{
  1949. {1, "Umm", "Kulthum"},
  1950. {2, "Eduard", "Khil"},
  1951. {3, "Audra", "McDonald"},
  1952. } {
  1953. muts = append(muts, Insert("Singers", columns, row))
  1954. }
  1955. if _, err := client.Apply(ctx, muts); err != nil {
  1956. t.Fatal(err)
  1957. }
  1958. var counts []int64
  1959. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
  1960. counts, err = tx.BatchUpdate(ctx, []Statement{
  1961. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
  1962. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
  1963. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
  1964. })
  1965. return err
  1966. })
  1967. if err != nil {
  1968. t.Fatal(err)
  1969. }
  1970. if want := []int64{1, 1, 1}; !testEqual(counts, want) {
  1971. t.Fatalf("got %d, want %d", counts, want)
  1972. }
  1973. got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
  1974. if err != nil {
  1975. t.Fatal(err)
  1976. }
  1977. want := [][]interface{}{
  1978. {int64(1), "changed 1", "Kulthum"},
  1979. {int64(2), "changed 2", "Khil"},
  1980. {int64(3), "changed 3", "McDonald"},
  1981. }
  1982. if !testEqual(got, want) {
  1983. t.Errorf("\ngot %v\nwant%v", got, want)
  1984. }
  1985. }
  1986. func TestBatchDML_NoStatements(t *testing.T) {
  1987. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  1988. defer cancel()
  1989. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  1990. defer cleanup()
  1991. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
  1992. _, err = tx.BatchUpdate(ctx, []Statement{})
  1993. return err
  1994. })
  1995. if err == nil {
  1996. t.Fatal("expected error, got nil")
  1997. }
  1998. if s, ok := status.FromError(err); ok {
  1999. if s.Code() != codes.InvalidArgument {
  2000. t.Fatalf("expected InvalidArgument, got %v", err)
  2001. }
  2002. } else {
  2003. t.Fatalf("expected InvalidArgument, got %v", err)
  2004. }
  2005. }
  2006. func TestBatchDML_TwoStatements(t *testing.T) {
  2007. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  2008. defer cancel()
  2009. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  2010. defer cleanup()
  2011. columns := []string{"SingerId", "FirstName", "LastName"}
  2012. // Populate the Singers table.
  2013. var muts []*Mutation
  2014. for _, row := range [][]interface{}{
  2015. {1, "Umm", "Kulthum"},
  2016. {2, "Eduard", "Khil"},
  2017. {3, "Audra", "McDonald"},
  2018. } {
  2019. muts = append(muts, Insert("Singers", columns, row))
  2020. }
  2021. if _, err := client.Apply(ctx, muts); err != nil {
  2022. t.Fatal(err)
  2023. }
  2024. var updateCount int64
  2025. var batchCounts []int64
  2026. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
  2027. batchCounts, err = tx.BatchUpdate(ctx, []Statement{
  2028. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
  2029. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
  2030. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
  2031. })
  2032. if err != nil {
  2033. return err
  2034. }
  2035. updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
  2036. return err
  2037. })
  2038. if err != nil {
  2039. t.Fatal(err)
  2040. }
  2041. if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
  2042. t.Fatalf("got %d, want %d", batchCounts, want)
  2043. }
  2044. if updateCount != 1 {
  2045. t.Fatalf("got %v, want 1", updateCount)
  2046. }
  2047. }
  2048. // TODO(deklerk) this currently does not work because the transaction appears to
  2049. // get rolled back after a single statement fails. b/120158761
  2050. func TestBatchDML_Error(t *testing.T) {
  2051. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  2052. defer cancel()
  2053. client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
  2054. defer cleanup()
  2055. columns := []string{"SingerId", "FirstName", "LastName"}
  2056. // Populate the Singers table.
  2057. var muts []*Mutation
  2058. for _, row := range [][]interface{}{
  2059. {1, "Umm", "Kulthum"},
  2060. {2, "Eduard", "Khil"},
  2061. {3, "Audra", "McDonald"},
  2062. } {
  2063. muts = append(muts, Insert("Singers", columns, row))
  2064. }
  2065. if _, err := client.Apply(ctx, muts); err != nil {
  2066. t.Fatal(err)
  2067. }
  2068. _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
  2069. counts, err := tx.BatchUpdate(ctx, []Statement{
  2070. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
  2071. {SQL: `some illegal statement`},
  2072. {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
  2073. })
  2074. if err == nil {
  2075. t.Fatal("expected err, got nil")
  2076. }
  2077. if want := []int64{1}; !testEqual(counts, want) {
  2078. t.Fatalf("got %d, want %d", counts, want)
  2079. }
  2080. got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
  2081. if err != nil {
  2082. t.Fatal(err)
  2083. }
  2084. want := [][]interface{}{
  2085. {int64(1), "changed 1", "Kulthum"},
  2086. {int64(2), "Eduard", "Khil"},
  2087. {int64(3), "Audra", "McDonald"},
  2088. }
  2089. if !testEqual(got, want) {
  2090. t.Errorf("\ngot %v\nwant%v", got, want)
  2091. }
  2092. return nil
  2093. })
  2094. if err != nil {
  2095. t.Fatal(err)
  2096. }
  2097. }
  2098. // Prepare initializes Cloud Spanner testing DB and clients.
  2099. func prepareIntegrationTest(ctx context.Context, t *testing.T, statements []string) (*Client, string, func()) {
  2100. if admin == nil {
  2101. t.Skip("Integration tests skipped")
  2102. }
  2103. // Construct a unique test DB name.
  2104. dbName := dbNameSpace.New()
  2105. dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
  2106. // Create database and tables.
  2107. op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
  2108. Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
  2109. CreateStatement: "CREATE DATABASE " + dbName,
  2110. ExtraStatements: statements,
  2111. })
  2112. if err != nil {
  2113. t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
  2114. }
  2115. if _, err := op.Wait(ctx); err != nil {
  2116. t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
  2117. }
  2118. client, err := createClient(ctx, dbPath)
  2119. if err != nil {
  2120. t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
  2121. }
  2122. return client, dbPath, func() {
  2123. client.Close()
  2124. if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
  2125. t.Logf("failed to drop database %s (error %v), might need a manual removal",
  2126. dbPath, err)
  2127. }
  2128. }
  2129. }
  2130. func cleanupDatabases() {
  2131. if admin == nil {
  2132. // Integration tests skipped.
  2133. return
  2134. }
  2135. ctx := context.Background()
  2136. dbsParent := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
  2137. dbsIter := admin.ListDatabases(ctx, &adminpb.ListDatabasesRequest{Parent: dbsParent})
  2138. expireAge := 24 * time.Hour
  2139. for {
  2140. db, err := dbsIter.Next()
  2141. if err == iterator.Done {
  2142. break
  2143. }
  2144. if err != nil {
  2145. panic(err)
  2146. }
  2147. // TODO(deklerk) When we have the ability to programmatically create
  2148. // instances, we can create an instance with uid.New and delete all
  2149. // tables in it. For now, we rely on matching prefixes.
  2150. if dbNameSpace.Older(db.Name, expireAge) {
  2151. log.Printf("Dropping database %s", db.Name)
  2152. if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: db.Name}); err != nil {
  2153. log.Printf("failed to drop database %s (error %v), might need a manual removal",
  2154. db.Name, err)
  2155. }
  2156. }
  2157. }
  2158. }
  2159. func rangeReads(ctx context.Context, t *testing.T, client *Client) {
  2160. checkRange := func(ks KeySet, wantNums ...int) {
  2161. if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
  2162. t.Errorf("key set %+v: %s", ks, msg)
  2163. }
  2164. }
  2165. checkRange(Key{"k1"}, 1)
  2166. checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
  2167. checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
  2168. checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
  2169. checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
  2170. // Partial key specification.
  2171. checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
  2172. checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
  2173. checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
  2174. checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
  2175. // The following produce empty ranges.
  2176. // TODO(jba): Consider a multi-part key to illustrate partial key behavior.
  2177. // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
  2178. // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
  2179. // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
  2180. // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
  2181. // Prefix is component-wise, not string prefix.
  2182. checkRange(Key{"k1"}.AsPrefix(), 1)
  2183. checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
  2184. checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
  2185. }
  2186. func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
  2187. checkRange := func(ks KeySet, wantNums ...int) {
  2188. if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
  2189. wantNums); !ok {
  2190. t.Errorf("key set %+v: %s", ks, msg)
  2191. }
  2192. }
  2193. checkRange(Key{"v1"}, 1)
  2194. checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
  2195. checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
  2196. checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
  2197. checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
  2198. // // Partial key specification.
  2199. checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
  2200. checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
  2201. checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
  2202. checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
  2203. // // The following produce empty ranges.
  2204. // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
  2205. // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
  2206. // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
  2207. // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
  2208. // // Prefix is component-wise, not string prefix.
  2209. checkRange(Key{"v1"}.AsPrefix(), 1)
  2210. checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
  2211. checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
  2212. // Read from an index with DESC ordering.
  2213. wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
  2214. if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
  2215. wantNums); !ok {
  2216. t.Errorf("desc: %s", msg)
  2217. }
  2218. }
  2219. type testTableRow struct{ Key, StringValue string }
  2220. func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
  2221. rows, err := readAllTestTable(iter)
  2222. if err != nil {
  2223. return err.Error(), false
  2224. }
  2225. want := map[string]string{}
  2226. for _, n := range wantNums {
  2227. want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
  2228. }
  2229. got := map[string]string{}
  2230. for _, r := range rows {
  2231. got[r.Key] = r.StringValue
  2232. }
  2233. if !testEqual(got, want) {
  2234. return fmt.Sprintf("got %v, want %v", got, want), false
  2235. }
  2236. return "", true
  2237. }
  2238. func isNaN(x interface{}) bool {
  2239. f, ok := x.(float64)
  2240. if !ok {
  2241. return false
  2242. }
  2243. return math.IsNaN(f)
  2244. }
  2245. // createClient creates Cloud Spanner data client.
  2246. func createClient(ctx context.Context, dbPath string) (client *Client, err error) {
  2247. client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
  2248. SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.2},
  2249. }, option.WithTokenSource(testutil.TokenSource(ctx, Scope)), option.WithEndpoint(endpoint))
  2250. if err != nil {
  2251. return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
  2252. }
  2253. return client, nil
  2254. }
  2255. // populate prepares the database with some data.
  2256. func populate(ctx context.Context, client *Client) error {
  2257. // Populate data
  2258. var err error
  2259. m := InsertMap("test", map[string]interface{}{
  2260. "a": str1,
  2261. "b": str2,
  2262. })
  2263. _, err = client.Apply(ctx, []*Mutation{m})
  2264. return err
  2265. }
  2266. func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
  2267. if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
  2268. return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
  2269. }
  2270. return "", true
  2271. }
  2272. func rowToValues(r *Row) ([]interface{}, error) {
  2273. var x int64
  2274. var y, z string
  2275. if err := r.Column(0, &x); err != nil {
  2276. return nil, err
  2277. }
  2278. if err := r.Column(1, &y); err != nil {
  2279. return nil, err
  2280. }
  2281. if err := r.Column(2, &z); err != nil {
  2282. return nil, err
  2283. }
  2284. return []interface{}{x, y, z}, nil
  2285. }
  2286. func readAll(iter *RowIterator) ([][]interface{}, error) {
  2287. defer iter.Stop()
  2288. var vals [][]interface{}
  2289. for {
  2290. row, err := iter.Next()
  2291. if err == iterator.Done {
  2292. return vals, nil
  2293. }
  2294. if err != nil {
  2295. return nil, err
  2296. }
  2297. v, err := rowToValues(row)
  2298. if err != nil {
  2299. return nil, err
  2300. }
  2301. vals = append(vals, v)
  2302. }
  2303. }
  2304. func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
  2305. defer iter.Stop()
  2306. var vals []testTableRow
  2307. for {
  2308. row, err := iter.Next()
  2309. if err == iterator.Done {
  2310. return vals, nil
  2311. }
  2312. if err != nil {
  2313. return nil, err
  2314. }
  2315. var ttr testTableRow
  2316. if err := row.ToStruct(&ttr); err != nil {
  2317. return nil, err
  2318. }
  2319. vals = append(vals, ttr)
  2320. }
  2321. }