You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

348 lines
10 KiB

  1. /*
  2. Copyright 2018 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/gob"
  18. "log"
  19. "time"
  20. "github.com/golang/protobuf/proto"
  21. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  22. )
  23. // BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
  24. // arbitrarily large amounts of data from Cloud Spanner databases.
  25. // BatchReadOnlyTransaction partitions a read/query request. Read/query request
  26. // can then be executed independently over each partition while observing the
  27. // same snapshot of the database. BatchReadOnlyTransaction can also be shared
  28. // across multiple clients by passing around the BatchReadOnlyTransactionID and
  29. // then recreating the transaction using Client.BatchReadOnlyTransactionFromID.
  30. //
  31. // Note: if a client is used only to run partitions, you can
  32. // create it using a ClientConfig with both MinOpened and MaxIdle set to
  33. // zero to avoid creating unnecessary sessions. You can also avoid excess
  34. // gRPC channels by setting ClientConfig.NumChannels to the number of
  35. // concurrently active BatchReadOnlyTransactions you expect to have.
  36. type BatchReadOnlyTransaction struct {
  37. ReadOnlyTransaction
  38. ID BatchReadOnlyTransactionID
  39. }
  40. // BatchReadOnlyTransactionID is a unique identifier for a
  41. // BatchReadOnlyTransaction. It can be used to re-create a
  42. // BatchReadOnlyTransaction on a different machine or process by calling
  43. // Client.BatchReadOnlyTransactionFromID.
  44. type BatchReadOnlyTransactionID struct {
  45. // unique ID for the transaction.
  46. tid transactionID
  47. // sid is the id of the Cloud Spanner session used for this transaction.
  48. sid string
  49. // rts is the read timestamp of this transaction.
  50. rts time.Time
  51. }
  52. // Partition defines a segment of data to be read in a batch read or query. A
  53. // partition can be serialized and processed across several different machines
  54. // or processes.
  55. type Partition struct {
  56. pt []byte
  57. qreq *sppb.ExecuteSqlRequest
  58. rreq *sppb.ReadRequest
  59. }
  60. // PartitionOptions specifies options for a PartitionQueryRequest and
  61. // PartitionReadRequest. See
  62. // https://godoc.org/google.golang.org/genproto/googleapis/spanner/v1#PartitionOptions
  63. // for more details.
  64. type PartitionOptions struct {
  65. // The desired data size for each partition generated.
  66. PartitionBytes int64
  67. // The desired maximum number of partitions to return.
  68. MaxPartitions int64
  69. }
  70. // toProto converts a spanner.PartitionOptions into a sppb.PartitionOptions
  71. func (opt PartitionOptions) toProto() *sppb.PartitionOptions {
  72. return &sppb.PartitionOptions{
  73. PartitionSizeBytes: opt.PartitionBytes,
  74. MaxPartitions: opt.MaxPartitions,
  75. }
  76. }
  77. // PartitionRead returns a list of Partitions that can be used to read rows from
  78. // the database. These partitions can be executed across multiple processes,
  79. // even across different machines. The partition size and count hints can be
  80. // configured using PartitionOptions.
  81. func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
  82. return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt)
  83. }
  84. // PartitionReadUsingIndex returns a list of Partitions that can be used to read
  85. // rows from the database using an index.
  86. func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
  87. sh, ts, err := t.acquire(ctx)
  88. if err != nil {
  89. return nil, err
  90. }
  91. sid, client := sh.getID(), sh.getClient()
  92. var (
  93. kset *sppb.KeySet
  94. resp *sppb.PartitionResponse
  95. partitions []*Partition
  96. )
  97. kset, err = keys.keySetProto()
  98. // request Partitions
  99. if err != nil {
  100. return nil, err
  101. }
  102. resp, err = client.PartitionRead(ctx, &sppb.PartitionReadRequest{
  103. Session: sid,
  104. Transaction: ts,
  105. Table: table,
  106. Index: index,
  107. Columns: columns,
  108. KeySet: kset,
  109. PartitionOptions: opt.toProto(),
  110. })
  111. // prepare ReadRequest
  112. req := &sppb.ReadRequest{
  113. Session: sid,
  114. Transaction: ts,
  115. Table: table,
  116. Index: index,
  117. Columns: columns,
  118. KeySet: kset,
  119. }
  120. // generate Partitions
  121. for _, p := range resp.GetPartitions() {
  122. partitions = append(partitions, &Partition{
  123. pt: p.PartitionToken,
  124. rreq: req,
  125. })
  126. }
  127. return partitions, err
  128. }
  129. // PartitionQuery returns a list of Partitions that can be used to execute a query against the database.
  130. func (t *BatchReadOnlyTransaction) PartitionQuery(ctx context.Context, statement Statement, opt PartitionOptions) ([]*Partition, error) {
  131. sh, ts, err := t.acquire(ctx)
  132. if err != nil {
  133. return nil, err
  134. }
  135. sid, client := sh.getID(), sh.getClient()
  136. params, paramTypes, err := statement.convertParams()
  137. if err != nil {
  138. return nil, err
  139. }
  140. // request Partitions
  141. req := &sppb.PartitionQueryRequest{
  142. Session: sid,
  143. Transaction: ts,
  144. Sql: statement.SQL,
  145. PartitionOptions: opt.toProto(),
  146. Params: params,
  147. ParamTypes: paramTypes,
  148. }
  149. resp, err := client.PartitionQuery(ctx, req)
  150. // prepare ExecuteSqlRequest
  151. r := &sppb.ExecuteSqlRequest{
  152. Session: sid,
  153. Transaction: ts,
  154. Sql: statement.SQL,
  155. Params: params,
  156. ParamTypes: paramTypes,
  157. }
  158. // generate Partitions
  159. var partitions []*Partition
  160. for _, p := range resp.GetPartitions() {
  161. partitions = append(partitions, &Partition{
  162. pt: p.PartitionToken,
  163. qreq: r,
  164. })
  165. }
  166. return partitions, err
  167. }
  168. // release implements txReadEnv.release, noop.
  169. func (t *BatchReadOnlyTransaction) release(err error) {
  170. }
  171. // setTimestamp implements txReadEnv.setTimestamp, noop.
  172. // read timestamp is ready on txn initialization, avoid contending writing to it with future partitions.
  173. func (t *BatchReadOnlyTransaction) setTimestamp(ts time.Time) {
  174. }
  175. // Close marks the txn as closed.
  176. func (t *BatchReadOnlyTransaction) Close() {
  177. t.mu.Lock()
  178. defer t.mu.Unlock()
  179. t.state = txClosed
  180. }
  181. // Cleanup cleans up all the resources used by this transaction and makes
  182. // it unusable. Once this method is invoked, the transaction is no longer
  183. // usable anywhere, including other clients/processes with which this
  184. // transaction was shared.
  185. //
  186. // Calling Cleanup is optional, but recommended. If Cleanup is not called, the
  187. // transaction's resources will be freed when the session expires on the backend and
  188. // is deleted. For more information about recycled sessions, see
  189. // https://cloud.google.com/spanner/docs/sessions.
  190. func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
  191. t.Close()
  192. t.mu.Lock()
  193. defer t.mu.Unlock()
  194. sh := t.sh
  195. if sh == nil {
  196. return
  197. }
  198. t.sh = nil
  199. sid, client := sh.getID(), sh.getClient()
  200. err := runRetryable(ctx, func(ctx context.Context) error {
  201. _, e := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid})
  202. return e
  203. })
  204. if err != nil {
  205. log.Printf("Failed to delete session %v. Error: %v", sid, err)
  206. }
  207. }
  208. // Execute runs a single Partition obtained from PartitionRead or PartitionQuery.
  209. func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *RowIterator {
  210. var (
  211. sh *sessionHandle
  212. err error
  213. rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error)
  214. )
  215. if sh, _, err = t.acquire(ctx); err != nil {
  216. return &RowIterator{err: err}
  217. }
  218. client := sh.getClient()
  219. if client == nil {
  220. // Might happen if transaction is closed in the middle of a API call.
  221. return &RowIterator{err: errSessionClosed(sh)}
  222. }
  223. // read or query partition
  224. if p.rreq != nil {
  225. p.rreq.PartitionToken = p.pt
  226. rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
  227. p.rreq.ResumeToken = resumeToken
  228. return client.StreamingRead(ctx, p.rreq)
  229. }
  230. } else {
  231. p.qreq.PartitionToken = p.pt
  232. rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
  233. p.qreq.ResumeToken = resumeToken
  234. return client.ExecuteStreamingSql(ctx, p.qreq)
  235. }
  236. }
  237. return stream(
  238. contextWithOutgoingMetadata(ctx, sh.getMetadata()),
  239. rpc,
  240. t.setTimestamp,
  241. t.release)
  242. }
  243. // MarshalBinary implements BinaryMarshaler.
  244. func (tid BatchReadOnlyTransactionID) MarshalBinary() (data []byte, err error) {
  245. var buf bytes.Buffer
  246. enc := gob.NewEncoder(&buf)
  247. if err := enc.Encode(tid.tid); err != nil {
  248. return nil, err
  249. }
  250. if err := enc.Encode(tid.sid); err != nil {
  251. return nil, err
  252. }
  253. if err := enc.Encode(tid.rts); err != nil {
  254. return nil, err
  255. }
  256. return buf.Bytes(), nil
  257. }
  258. // UnmarshalBinary implements BinaryUnmarshaler.
  259. func (tid *BatchReadOnlyTransactionID) UnmarshalBinary(data []byte) error {
  260. dec := gob.NewDecoder(bytes.NewReader(data))
  261. if err := dec.Decode(&tid.tid); err != nil {
  262. return err
  263. }
  264. if err := dec.Decode(&tid.sid); err != nil {
  265. return err
  266. }
  267. return dec.Decode(&tid.rts)
  268. }
  269. // MarshalBinary implements BinaryMarshaler.
  270. func (p Partition) MarshalBinary() (data []byte, err error) {
  271. var buf bytes.Buffer
  272. enc := gob.NewEncoder(&buf)
  273. if err := enc.Encode(p.pt); err != nil {
  274. return nil, err
  275. }
  276. var isReadPartition bool
  277. var req proto.Message
  278. if p.rreq != nil {
  279. isReadPartition = true
  280. req = p.rreq
  281. } else {
  282. isReadPartition = false
  283. req = p.qreq
  284. }
  285. if err := enc.Encode(isReadPartition); err != nil {
  286. return nil, err
  287. }
  288. if data, err = proto.Marshal(req); err != nil {
  289. return nil, err
  290. }
  291. if err := enc.Encode(data); err != nil {
  292. return nil, err
  293. }
  294. return buf.Bytes(), nil
  295. }
  296. // UnmarshalBinary implements BinaryUnmarshaler.
  297. func (p *Partition) UnmarshalBinary(data []byte) error {
  298. var (
  299. isReadPartition bool
  300. d []byte
  301. err error
  302. )
  303. dec := gob.NewDecoder(bytes.NewReader(data))
  304. if err := dec.Decode(&p.pt); err != nil {
  305. return err
  306. }
  307. if err := dec.Decode(&isReadPartition); err != nil {
  308. return err
  309. }
  310. if err := dec.Decode(&d); err != nil {
  311. return err
  312. }
  313. if isReadPartition {
  314. p.rreq = &sppb.ReadRequest{}
  315. err = proto.Unmarshal(d, p.rreq)
  316. } else {
  317. p.qreq = &sppb.ExecuteSqlRequest{}
  318. err = proto.Unmarshal(d, p.qreq)
  319. }
  320. return err
  321. }