You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

733 lines
24 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "bytes"
  16. "context"
  17. "io"
  18. "log"
  19. "sync/atomic"
  20. "time"
  21. "cloud.google.com/go/internal/protostruct"
  22. "cloud.google.com/go/spanner/internal/backoff"
  23. proto "github.com/golang/protobuf/proto"
  24. proto3 "github.com/golang/protobuf/ptypes/struct"
  25. "google.golang.org/api/iterator"
  26. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  27. "google.golang.org/grpc/codes"
  28. )
  29. // streamingReceiver is the interface for receiving data from a client side
  30. // stream.
  31. type streamingReceiver interface {
  32. Recv() (*sppb.PartialResultSet, error)
  33. }
  34. // errEarlyReadEnd returns error for read finishes when gRPC stream is still active.
  35. func errEarlyReadEnd() error {
  36. return spannerErrorf(codes.FailedPrecondition, "read completed with active stream")
  37. }
  38. // stream is the internal fault tolerant method for streaming data from
  39. // Cloud Spanner.
  40. func stream(ctx context.Context, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error)) *RowIterator {
  41. ctx, cancel := context.WithCancel(ctx)
  42. ctx = startSpan(ctx, "cloud.google.com/go/spanner.RowIterator")
  43. return &RowIterator{
  44. streamd: newResumableStreamDecoder(ctx, rpc),
  45. rowd: &partialResultSetDecoder{},
  46. setTimestamp: setTimestamp,
  47. release: release,
  48. cancel: cancel,
  49. }
  50. }
  51. // RowIterator is an iterator over Rows.
  52. type RowIterator struct {
  53. // The plan for the query. Available after RowIterator.Next returns iterator.Done
  54. // if QueryWithStats was called.
  55. QueryPlan *sppb.QueryPlan
  56. // Execution statistics for the query. Available after RowIterator.Next returns iterator.Done
  57. // if QueryWithStats was called.
  58. QueryStats map[string]interface{}
  59. // For a DML statement, the number of rows affected. For PDML, this is a lower bound.
  60. // Available for DML statements after RowIterator.Next returns iterator.Done.
  61. RowCount int64
  62. streamd *resumableStreamDecoder
  63. rowd *partialResultSetDecoder
  64. setTimestamp func(time.Time)
  65. release func(error)
  66. cancel func()
  67. err error
  68. rows []*Row
  69. sawStats bool
  70. }
  71. // Next returns the next result. Its second return value is iterator.Done if
  72. // there are no more results. Once Next returns Done, all subsequent calls
  73. // will return Done.
  74. func (r *RowIterator) Next() (*Row, error) {
  75. if r.err != nil {
  76. return nil, r.err
  77. }
  78. for len(r.rows) == 0 && r.streamd.next() {
  79. prs := r.streamd.get()
  80. if prs.Stats != nil {
  81. r.sawStats = true
  82. r.QueryPlan = prs.Stats.QueryPlan
  83. r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats)
  84. if prs.Stats.RowCount != nil {
  85. rc, err := extractRowCount(prs.Stats)
  86. if err != nil {
  87. return nil, err
  88. }
  89. r.RowCount = rc
  90. }
  91. }
  92. r.rows, r.err = r.rowd.add(prs)
  93. if r.err != nil {
  94. return nil, r.err
  95. }
  96. if !r.rowd.ts.IsZero() && r.setTimestamp != nil {
  97. r.setTimestamp(r.rowd.ts)
  98. r.setTimestamp = nil
  99. }
  100. }
  101. if len(r.rows) > 0 {
  102. row := r.rows[0]
  103. r.rows = r.rows[1:]
  104. return row, nil
  105. }
  106. if err := r.streamd.lastErr(); err != nil {
  107. r.err = toSpannerError(err)
  108. } else if !r.rowd.done() {
  109. r.err = errEarlyReadEnd()
  110. } else {
  111. r.err = iterator.Done
  112. }
  113. return nil, r.err
  114. }
  115. func extractRowCount(stats *sppb.ResultSetStats) (int64, error) {
  116. if stats.RowCount == nil {
  117. return 0, spannerErrorf(codes.Internal, "missing RowCount")
  118. }
  119. switch rc := stats.RowCount.(type) {
  120. case *sppb.ResultSetStats_RowCountExact:
  121. return rc.RowCountExact, nil
  122. case *sppb.ResultSetStats_RowCountLowerBound:
  123. return rc.RowCountLowerBound, nil
  124. default:
  125. return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount)
  126. }
  127. }
  128. // Do calls the provided function once in sequence for each row in the iteration. If the
  129. // function returns a non-nil error, Do immediately returns that error.
  130. //
  131. // If there are no rows in the iterator, Do will return nil without calling the
  132. // provided function.
  133. //
  134. // Do always calls Stop on the iterator.
  135. func (r *RowIterator) Do(f func(r *Row) error) error {
  136. defer r.Stop()
  137. for {
  138. row, err := r.Next()
  139. switch err {
  140. case iterator.Done:
  141. return nil
  142. case nil:
  143. if err = f(row); err != nil {
  144. return err
  145. }
  146. default:
  147. return err
  148. }
  149. }
  150. }
  151. // Stop terminates the iteration. It should be called after you finish using the iterator.
  152. func (r *RowIterator) Stop() {
  153. if r.streamd != nil {
  154. defer endSpan(r.streamd.ctx, r.err)
  155. }
  156. if r.cancel != nil {
  157. r.cancel()
  158. }
  159. if r.release != nil {
  160. r.release(r.err)
  161. if r.err == nil {
  162. r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
  163. }
  164. r.release = nil
  165. }
  166. }
  167. // partialResultQueue implements a simple FIFO queue. The zero value is a
  168. // valid queue.
  169. type partialResultQueue struct {
  170. q []*sppb.PartialResultSet
  171. first int
  172. last int
  173. n int // number of elements in queue
  174. }
  175. // empty returns if the partialResultQueue is empty.
  176. func (q *partialResultQueue) empty() bool {
  177. return q.n == 0
  178. }
  179. // errEmptyQueue returns error for dequeuing an empty queue.
  180. func errEmptyQueue() error {
  181. return spannerErrorf(codes.OutOfRange, "empty partialResultQueue")
  182. }
  183. // peekLast returns the last item in partialResultQueue; if the queue
  184. // is empty, it returns error.
  185. func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) {
  186. if q.empty() {
  187. return nil, errEmptyQueue()
  188. }
  189. return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil
  190. }
  191. // push adds an item to the tail of partialResultQueue.
  192. func (q *partialResultQueue) push(r *sppb.PartialResultSet) {
  193. if q.q == nil {
  194. q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */)
  195. }
  196. if q.n == cap(q.q) {
  197. buf := make([]*sppb.PartialResultSet, cap(q.q)*2)
  198. for i := 0; i < q.n; i++ {
  199. buf[i] = q.q[(q.first+i)%cap(q.q)]
  200. }
  201. q.q = buf
  202. q.first = 0
  203. q.last = q.n
  204. }
  205. q.q[q.last] = r
  206. q.last = (q.last + 1) % cap(q.q)
  207. q.n++
  208. }
  209. // pop removes an item from the head of partialResultQueue and returns
  210. // it.
  211. func (q *partialResultQueue) pop() *sppb.PartialResultSet {
  212. if q.n == 0 {
  213. return nil
  214. }
  215. r := q.q[q.first]
  216. q.q[q.first] = nil
  217. q.first = (q.first + 1) % cap(q.q)
  218. q.n--
  219. return r
  220. }
  221. // clear empties partialResultQueue.
  222. func (q *partialResultQueue) clear() {
  223. *q = partialResultQueue{}
  224. }
  225. // dump retrieves all items from partialResultQueue and return them in a slice.
  226. // It is used only in tests.
  227. func (q *partialResultQueue) dump() []*sppb.PartialResultSet {
  228. var dq []*sppb.PartialResultSet
  229. for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) {
  230. dq = append(dq, q.q[i])
  231. }
  232. return dq
  233. }
  234. // resumableStreamDecoderState encodes resumableStreamDecoder's status.
  235. // See also the comments for resumableStreamDecoder.Next.
  236. type resumableStreamDecoderState int
  237. const (
  238. unConnected resumableStreamDecoderState = iota // 0
  239. queueingRetryable // 1
  240. queueingUnretryable // 2
  241. aborted // 3
  242. finished // 4
  243. )
  244. // resumableStreamDecoder provides a resumable interface for receiving
  245. // sppb.PartialResultSet(s) from a given query wrapped by
  246. // resumableStreamDecoder.rpc().
  247. type resumableStreamDecoder struct {
  248. // state is the current status of resumableStreamDecoder, see also
  249. // the comments for resumableStreamDecoder.Next.
  250. state resumableStreamDecoderState
  251. // stateWitness when non-nil is called to observe state change,
  252. // used for testing.
  253. stateWitness func(resumableStreamDecoderState)
  254. // ctx is the caller's context, used for cancel/timeout Next().
  255. ctx context.Context
  256. // rpc is a factory of streamingReceiver, which might resume
  257. // a previous stream from the point encoded in restartToken.
  258. // rpc is always a wrapper of a Cloud Spanner query which is
  259. // resumable.
  260. rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error)
  261. // stream is the current RPC streaming receiver.
  262. stream streamingReceiver
  263. // q buffers received yet undecoded partial results.
  264. q partialResultQueue
  265. // bytesBetweenResumeTokens is the proxy of the byte size of PartialResultSets being queued
  266. // between two resume tokens. Once bytesBetweenResumeTokens is greater than
  267. // maxBytesBetweenResumeTokens, resumableStreamDecoder goes into queueingUnretryable state.
  268. bytesBetweenResumeTokens int32
  269. // maxBytesBetweenResumeTokens is the max number of bytes that can be buffered
  270. // between two resume tokens. It is always copied from the global maxBytesBetweenResumeTokens
  271. // atomically.
  272. maxBytesBetweenResumeTokens int32
  273. // np is the next sppb.PartialResultSet ready to be returned
  274. // to caller of resumableStreamDecoder.Get().
  275. np *sppb.PartialResultSet
  276. // resumeToken stores the resume token that resumableStreamDecoder has
  277. // last revealed to caller.
  278. resumeToken []byte
  279. // retryCount is the number of retries that have been carried out so far
  280. retryCount int
  281. // err is the last error resumableStreamDecoder has encountered so far.
  282. err error
  283. // backoff to compute delays between retries.
  284. backoff backoff.ExponentialBackoff
  285. }
  286. // newResumableStreamDecoder creates a new resumeableStreamDecoder instance.
  287. // Parameter rpc should be a function that creates a new stream
  288. // beginning at the restartToken if non-nil.
  289. func newResumableStreamDecoder(ctx context.Context, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error)) *resumableStreamDecoder {
  290. return &resumableStreamDecoder{
  291. ctx: ctx,
  292. rpc: rpc,
  293. maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens),
  294. backoff: backoff.DefaultBackoff,
  295. }
  296. }
  297. // changeState fulfills state transition for resumableStateDecoder.
  298. func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) {
  299. if d.state == queueingRetryable && d.state != target {
  300. // Reset bytesBetweenResumeTokens because it is only meaningful/changed under
  301. // queueingRetryable state.
  302. d.bytesBetweenResumeTokens = 0
  303. }
  304. d.state = target
  305. if d.stateWitness != nil {
  306. d.stateWitness(target)
  307. }
  308. }
  309. // isNewResumeToken returns if the observed resume token is different from
  310. // the one returned from server last time.
  311. func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool {
  312. if rt == nil {
  313. return false
  314. }
  315. if bytes.Equal(rt, d.resumeToken) {
  316. return false
  317. }
  318. return true
  319. }
  320. // Next advances to the next available partial result set. If error or no
  321. // more, returns false, call Err to determine if an error was encountered.
  322. // The following diagram illustrates the state machine of resumableStreamDecoder
  323. // that Next() implements. Note that state transition can be only triggered by
  324. // RPC activities.
  325. /*
  326. rpc() fails retryable
  327. +---------+
  328. | | rpc() fails unretryable/ctx timeouts or cancelled
  329. | | +------------------------------------------------+
  330. | | | |
  331. | v | v
  332. | +---+---+---+ +--------+ +------+--+
  333. +-----+unConnected| |finished| | aborted |<----+
  334. | | ++-----+-+ +------+--+ |
  335. +---+----+--+ ^ ^ ^ |
  336. | ^ | | | |
  337. | | | | recv() fails |
  338. | | | | | |
  339. | |recv() fails retryable | | | |
  340. | |with valid ctx | | | |
  341. | | | | | |
  342. rpc() succeeds | +-----------------------+ | | |
  343. | | | recv EOF recv EOF | |
  344. | | | | | |
  345. v | | Queue size exceeds | | |
  346. +---+----+---+----+threshold +-------+-----------+ | |
  347. +---------->+ +--------------->+ +-+ |
  348. | |queueingRetryable| |queueingUnretryable| |
  349. | | +<---------------+ | |
  350. | +---+----------+--+ pop() returns +--+----+-----------+ |
  351. | | | resume token | ^ |
  352. | | | | | |
  353. | | | | | |
  354. +---------------+ | | | |
  355. recv() succeeds | +----+ |
  356. | recv() succeeds |
  357. | |
  358. | |
  359. | |
  360. | |
  361. | |
  362. +--------------------------------------------------+
  363. recv() fails unretryable
  364. */
  365. var (
  366. // maxBytesBetweenResumeTokens is the maximum amount of bytes that resumableStreamDecoder
  367. // in queueingRetryable state can use to queue PartialResultSets before getting
  368. // into queueingUnretryable state.
  369. maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024)
  370. )
  371. func (d *resumableStreamDecoder) next() bool {
  372. for {
  373. select {
  374. case <-d.ctx.Done():
  375. // Do context check here so that even gRPC failed to do
  376. // so, resumableStreamDecoder can still break the loop
  377. // as expected.
  378. d.err = errContextCanceled(d.ctx, d.err)
  379. d.changeState(aborted)
  380. default:
  381. }
  382. switch d.state {
  383. case unConnected:
  384. // If no gRPC stream is available, try to initiate one.
  385. if d.stream, d.err = d.rpc(d.ctx, d.resumeToken); d.err != nil {
  386. if isRetryable(d.err) {
  387. d.doBackOff()
  388. // Be explicit about state transition, although the
  389. // state doesn't actually change. State transition
  390. // will be triggered only by RPC activity, regardless of
  391. // whether there is an actual state change or not.
  392. d.changeState(unConnected)
  393. continue
  394. }
  395. d.changeState(aborted)
  396. continue
  397. }
  398. d.resetBackOff()
  399. d.changeState(queueingRetryable)
  400. continue
  401. case queueingRetryable:
  402. fallthrough
  403. case queueingUnretryable:
  404. // Receiving queue is not empty.
  405. last, err := d.q.peekLast()
  406. if err != nil {
  407. // Only the case that receiving queue is empty could cause peekLast to
  408. // return error and in such case, we should try to receive from stream.
  409. d.tryRecv()
  410. continue
  411. }
  412. if d.isNewResumeToken(last.ResumeToken) {
  413. // Got new resume token, return buffered sppb.PartialResultSets to caller.
  414. d.np = d.q.pop()
  415. if d.q.empty() {
  416. d.bytesBetweenResumeTokens = 0
  417. // The new resume token was just popped out from queue, record it.
  418. d.resumeToken = d.np.ResumeToken
  419. d.changeState(queueingRetryable)
  420. }
  421. return true
  422. }
  423. if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable {
  424. d.changeState(queueingUnretryable)
  425. continue
  426. }
  427. if d.state == queueingUnretryable {
  428. // When there is no resume token observed,
  429. // only yield sppb.PartialResultSets to caller under
  430. // queueingUnretryable state.
  431. d.np = d.q.pop()
  432. return true
  433. }
  434. // Needs to receive more from gRPC stream till a new resume token
  435. // is observed.
  436. d.tryRecv()
  437. continue
  438. case aborted:
  439. // Discard all pending items because none of them
  440. // should be yield to caller.
  441. d.q.clear()
  442. return false
  443. case finished:
  444. // If query has finished, check if there are still buffered messages.
  445. if d.q.empty() {
  446. // No buffered PartialResultSet.
  447. return false
  448. }
  449. // Although query has finished, there are still buffered PartialResultSets.
  450. d.np = d.q.pop()
  451. return true
  452. default:
  453. log.Printf("Unexpected resumableStreamDecoder.state: %v", d.state)
  454. return false
  455. }
  456. }
  457. }
  458. // tryRecv attempts to receive a PartialResultSet from gRPC stream.
  459. func (d *resumableStreamDecoder) tryRecv() {
  460. var res *sppb.PartialResultSet
  461. if res, d.err = d.stream.Recv(); d.err != nil {
  462. if d.err == io.EOF {
  463. d.err = nil
  464. d.changeState(finished)
  465. return
  466. }
  467. if isRetryable(d.err) && d.state == queueingRetryable {
  468. d.err = nil
  469. // Discard all queue items (none have resume tokens).
  470. d.q.clear()
  471. d.stream = nil
  472. d.changeState(unConnected)
  473. d.doBackOff()
  474. return
  475. }
  476. d.changeState(aborted)
  477. return
  478. }
  479. d.q.push(res)
  480. if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) {
  481. // adjusting d.bytesBetweenResumeTokens
  482. d.bytesBetweenResumeTokens += int32(proto.Size(res))
  483. }
  484. d.resetBackOff()
  485. d.changeState(d.state)
  486. }
  487. // resetBackOff clears the internal retry counter of
  488. // resumableStreamDecoder so that the next exponential
  489. // backoff will start at a fresh state.
  490. func (d *resumableStreamDecoder) resetBackOff() {
  491. d.retryCount = 0
  492. }
  493. // doBackoff does an exponential backoff sleep.
  494. func (d *resumableStreamDecoder) doBackOff() {
  495. delay := d.backoff.Delay(d.retryCount)
  496. statsPrintf(d.ctx, nil, "Backing off stream read for %s", delay)
  497. ticker := time.NewTicker(delay)
  498. defer ticker.Stop()
  499. d.retryCount++
  500. select {
  501. case <-d.ctx.Done():
  502. case <-ticker.C:
  503. }
  504. }
  505. // get returns the most recent PartialResultSet generated by a call to next.
  506. func (d *resumableStreamDecoder) get() *sppb.PartialResultSet {
  507. return d.np
  508. }
  509. // lastErr returns the last non-EOF error encountered.
  510. func (d *resumableStreamDecoder) lastErr() error {
  511. return d.err
  512. }
  513. // partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner
  514. // Rows.
  515. type partialResultSetDecoder struct {
  516. row Row
  517. tx *sppb.Transaction
  518. chunked bool // if true, next value should be merged with last values entry.
  519. ts time.Time // read timestamp
  520. }
  521. // yield checks we have a complete row, and if so returns it. A row is not
  522. // complete if it doesn't have enough columns, or if this is a chunked response
  523. // and there are no further values to process.
  524. func (p *partialResultSetDecoder) yield(chunked, last bool) *Row {
  525. if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) {
  526. // When partialResultSetDecoder gets enough number of
  527. // Column values, There are two cases that a new Row
  528. // should be yield:
  529. // 1. The incoming PartialResultSet is not chunked;
  530. // 2. The incoming PartialResultSet is chunked, but the
  531. // proto3.Value being merged is not the last one in
  532. // the PartialResultSet.
  533. //
  534. // Use a fresh Row to simplify clients that want to use yielded results
  535. // after the next row is retrieved. Note that fields is never changed
  536. // so it doesn't need to be copied.
  537. fresh := Row{
  538. fields: p.row.fields,
  539. vals: make([]*proto3.Value, len(p.row.vals)),
  540. }
  541. copy(fresh.vals, p.row.vals)
  542. p.row.vals = p.row.vals[:0] // empty and reuse slice
  543. return &fresh
  544. }
  545. return nil
  546. }
  547. // yieldTx returns transaction information via caller supplied callback.
  548. func errChunkedEmptyRow() error {
  549. return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row")
  550. }
  551. // add tries to merge a new PartialResultSet into buffered Row. It returns
  552. // any rows that have been completed as a result.
  553. func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) {
  554. var rows []*Row
  555. if r.Metadata != nil {
  556. // Metadata should only be returned in the first result.
  557. if p.row.fields == nil {
  558. p.row.fields = r.Metadata.RowType.Fields
  559. }
  560. if p.tx == nil && r.Metadata.Transaction != nil {
  561. p.tx = r.Metadata.Transaction
  562. if p.tx.ReadTimestamp != nil {
  563. p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos))
  564. }
  565. }
  566. }
  567. if len(r.Values) == 0 {
  568. return nil, nil
  569. }
  570. if p.chunked {
  571. p.chunked = false
  572. // Try to merge first value in r.Values into
  573. // uncompleted row.
  574. last := len(p.row.vals) - 1
  575. if last < 0 { // sanity check
  576. return nil, errChunkedEmptyRow()
  577. }
  578. var err error
  579. // If p is chunked, then we should always try to merge p.last with r.first.
  580. if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
  581. return nil, err
  582. }
  583. r.Values = r.Values[1:]
  584. // Merge is done, try to yield a complete Row.
  585. if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil {
  586. rows = append(rows, row)
  587. }
  588. }
  589. for i, v := range r.Values {
  590. // The rest values in r can be appened into p directly.
  591. p.row.vals = append(p.row.vals, v)
  592. // Again, check to see if a complete Row can be yielded because of
  593. // the newly added value.
  594. if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil {
  595. rows = append(rows, row)
  596. }
  597. }
  598. if r.ChunkedValue {
  599. // After dealing with all values in r, if r is chunked then p must
  600. // be also chunked.
  601. p.chunked = true
  602. }
  603. return rows, nil
  604. }
  605. // isMergeable returns if a protobuf Value can be potentially merged with
  606. // other protobuf Values.
  607. func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool {
  608. switch a.Kind.(type) {
  609. case *proto3.Value_StringValue:
  610. return true
  611. case *proto3.Value_ListValue:
  612. return true
  613. default:
  614. return false
  615. }
  616. }
  617. // errIncompatibleMergeTypes returns error for incompatible protobuf types
  618. // that cannot be merged by partialResultSetDecoder.
  619. func errIncompatibleMergeTypes(a, b *proto3.Value) error {
  620. return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind)
  621. }
  622. // errUnsupportedMergeType returns error for protobuf type that cannot be
  623. // merged to other protobufs.
  624. func errUnsupportedMergeType(a *proto3.Value) error {
  625. return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind)
  626. }
  627. // merge tries to combine two protobuf Values if possible.
  628. func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) {
  629. var err error
  630. typeErr := errIncompatibleMergeTypes(a, b)
  631. switch t := a.Kind.(type) {
  632. case *proto3.Value_StringValue:
  633. s, ok := b.Kind.(*proto3.Value_StringValue)
  634. if !ok {
  635. return nil, typeErr
  636. }
  637. return &proto3.Value{
  638. Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue},
  639. }, nil
  640. case *proto3.Value_ListValue:
  641. l, ok := b.Kind.(*proto3.Value_ListValue)
  642. if !ok {
  643. return nil, typeErr
  644. }
  645. if l.ListValue == nil || len(l.ListValue.Values) <= 0 {
  646. // b is an empty list, just return a.
  647. return a, nil
  648. }
  649. if t.ListValue == nil || len(t.ListValue.Values) <= 0 {
  650. // a is an empty list, just return b.
  651. return b, nil
  652. }
  653. if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) {
  654. // When the last item in a is of type String,
  655. // List or Struct(encoded into List by Cloud Spanner),
  656. // try to Merge last item in a and first item in b.
  657. t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0])
  658. if err != nil {
  659. return nil, err
  660. }
  661. l.ListValue.Values = l.ListValue.Values[1:]
  662. }
  663. return &proto3.Value{
  664. Kind: &proto3.Value_ListValue{
  665. ListValue: &proto3.ListValue{
  666. Values: append(t.ListValue.Values, l.ListValue.Values...),
  667. },
  668. },
  669. }, nil
  670. default:
  671. return nil, errUnsupportedMergeType(a)
  672. }
  673. }
  674. // Done returns if partialResultSetDecoder has already done with all buffered
  675. // values.
  676. func (p *partialResultSetDecoder) done() bool {
  677. // There is no explicit end of stream marker, but ending part way
  678. // through a row is obviously bad, or ending with the last column still
  679. // awaiting completion.
  680. return len(p.row.vals) == 0 && !p.chunked
  681. }