You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

446 lines
14 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "context"
  16. "fmt"
  17. "regexp"
  18. "sync/atomic"
  19. "time"
  20. "cloud.google.com/go/internal/version"
  21. "google.golang.org/api/option"
  22. gtransport "google.golang.org/api/transport/grpc"
  23. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/metadata"
  27. )
  28. const (
  29. endpoint = "spanner.googleapis.com:443"
  30. // resourcePrefixHeader is the name of the metadata header used to indicate
  31. // the resource being operated on.
  32. resourcePrefixHeader = "google-cloud-resource-prefix"
  33. // xGoogHeaderKey is the name of the metadata header used to indicate client
  34. // information.
  35. xGoogHeaderKey = "x-goog-api-client"
  36. )
  37. const (
  38. // Scope is the scope for Cloud Spanner Data API.
  39. Scope = "https://www.googleapis.com/auth/spanner.data"
  40. // AdminScope is the scope for Cloud Spanner Admin APIs.
  41. AdminScope = "https://www.googleapis.com/auth/spanner.admin"
  42. )
  43. var (
  44. validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
  45. xGoogHeaderVal = fmt.Sprintf("gl-go/%s gccl/%s grpc/%s", version.Go(), version.Repo, grpc.Version)
  46. )
  47. func validDatabaseName(db string) error {
  48. if matched := validDBPattern.MatchString(db); !matched {
  49. return fmt.Errorf("database name %q should conform to pattern %q",
  50. db, validDBPattern.String())
  51. }
  52. return nil
  53. }
  54. // Client is a client for reading and writing data to a Cloud Spanner database. A
  55. // client is safe to use concurrently, except for its Close method.
  56. type Client struct {
  57. // rr must be accessed through atomic operations.
  58. rr uint32
  59. // TODO(deklerk): we should not keep multiple ClientConns / SpannerClients. Instead, we should
  60. // have a single ClientConn that has many connections: https://github.com/googleapis/google-api-go-client/blob/003c13302b3ea5ae44344459ba080364bd46155f/internal/pool.go
  61. conns []*grpc.ClientConn
  62. clients []sppb.SpannerClient
  63. database string
  64. // Metadata to be sent with each request.
  65. md metadata.MD
  66. idleSessions *sessionPool
  67. // sessionLabels for the sessions created by this client.
  68. sessionLabels map[string]string
  69. }
  70. // ClientConfig has configurations for the client.
  71. type ClientConfig struct {
  72. // NumChannels is the number of gRPC channels.
  73. // If zero, a reasonable default is used based on the execution environment.
  74. NumChannels int
  75. // SessionPoolConfig is the configuration for session pool.
  76. SessionPoolConfig
  77. // SessionLabels for the sessions created by this client.
  78. // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session for more info.
  79. SessionLabels map[string]string
  80. }
  81. // errDial returns error for dialing to Cloud Spanner.
  82. func errDial(ci int, err error) error {
  83. e := toSpannerError(err).(*Error)
  84. e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
  85. return e
  86. }
  87. func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
  88. existing, ok := metadata.FromOutgoingContext(ctx)
  89. if ok {
  90. md = metadata.Join(existing, md)
  91. }
  92. return metadata.NewOutgoingContext(ctx, md)
  93. }
  94. // NewClient creates a client to a database. A valid database name has the
  95. // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses a default
  96. // configuration.
  97. func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
  98. return NewClientWithConfig(ctx, database, ClientConfig{}, opts...)
  99. }
  100. // NewClientWithConfig creates a client to a database. A valid database name has the
  101. // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
  102. func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
  103. ctx = startSpan(ctx, "cloud.google.com/go/spanner.NewClient")
  104. defer func() { endSpan(ctx, err) }()
  105. // Validate database path.
  106. if err := validDatabaseName(database); err != nil {
  107. return nil, err
  108. }
  109. c = &Client{
  110. database: database,
  111. md: metadata.Pairs(
  112. resourcePrefixHeader, database,
  113. xGoogHeaderKey, xGoogHeaderVal),
  114. }
  115. // Make a copy of labels.
  116. c.sessionLabels = make(map[string]string)
  117. for k, v := range config.SessionLabels {
  118. c.sessionLabels[k] = v
  119. }
  120. // gRPC options
  121. allOpts := []option.ClientOption{
  122. option.WithEndpoint(endpoint),
  123. option.WithScopes(Scope),
  124. option.WithGRPCDialOption(
  125. grpc.WithDefaultCallOptions(
  126. grpc.MaxCallSendMsgSize(100<<20),
  127. grpc.MaxCallRecvMsgSize(100<<20),
  128. ),
  129. ),
  130. }
  131. allOpts = append(allOpts, opts...)
  132. // Prepare gRPC channels.
  133. if config.NumChannels == 0 {
  134. config.NumChannels = numChannels
  135. }
  136. // Default configs for session pool.
  137. if config.MaxOpened == 0 {
  138. config.MaxOpened = uint64(config.NumChannels * 100)
  139. }
  140. if config.MaxBurst == 0 {
  141. config.MaxBurst = 10
  142. }
  143. // TODO(deklerk) This should be replaced with a balancer with config.NumChannels
  144. // connections, instead of config.NumChannels clientconns.
  145. for i := 0; i < config.NumChannels; i++ {
  146. conn, err := gtransport.Dial(ctx, allOpts...)
  147. if err != nil {
  148. return nil, errDial(i, err)
  149. }
  150. c.conns = append(c.conns, conn)
  151. c.clients = append(c.clients, sppb.NewSpannerClient(conn))
  152. }
  153. // Prepare session pool.
  154. config.SessionPoolConfig.getRPCClient = func() (sppb.SpannerClient, error) {
  155. // TODO: support more loadbalancing options.
  156. return c.rrNext(), nil
  157. }
  158. config.SessionPoolConfig.sessionLabels = c.sessionLabels
  159. sp, err := newSessionPool(database, config.SessionPoolConfig, c.md)
  160. if err != nil {
  161. c.Close()
  162. return nil, err
  163. }
  164. c.idleSessions = sp
  165. return c, nil
  166. }
  167. // rrNext returns the next available Cloud Spanner RPC client in a round-robin manner.
  168. func (c *Client) rrNext() sppb.SpannerClient {
  169. return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))]
  170. }
  171. // Close closes the client.
  172. func (c *Client) Close() {
  173. if c.idleSessions != nil {
  174. c.idleSessions.close()
  175. }
  176. for _, conn := range c.conns {
  177. conn.Close()
  178. }
  179. }
  180. // Single provides a read-only snapshot transaction optimized for the case
  181. // where only a single read or query is needed. This is more efficient than
  182. // using ReadOnlyTransaction() for a single read or query.
  183. //
  184. // Single will use a strong TimestampBound by default. Use
  185. // ReadOnlyTransaction.WithTimestampBound to specify a different
  186. // TimestampBound. A non-strong bound can be used to reduce latency, or
  187. // "time-travel" to prior versions of the database, see the documentation of
  188. // TimestampBound for details.
  189. func (c *Client) Single() *ReadOnlyTransaction {
  190. t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions}
  191. t.txReadOnly.txReadEnv = t
  192. return t
  193. }
  194. // ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
  195. // multiple reads from the database. You must call Close() when the
  196. // ReadOnlyTransaction is no longer needed to release resources on the server.
  197. //
  198. // ReadOnlyTransaction will use a strong TimestampBound by default. Use
  199. // ReadOnlyTransaction.WithTimestampBound to specify a different
  200. // TimestampBound. A non-strong bound can be used to reduce latency, or
  201. // "time-travel" to prior versions of the database, see the documentation of
  202. // TimestampBound for details.
  203. func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
  204. t := &ReadOnlyTransaction{
  205. singleUse: false,
  206. sp: c.idleSessions,
  207. txReadyOrClosed: make(chan struct{}),
  208. }
  209. t.txReadOnly.txReadEnv = t
  210. return t
  211. }
  212. // BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
  213. // for partitioned reads or queries from a snapshot of the database. This is
  214. // useful in batch processing pipelines where one wants to divide the work of
  215. // reading from the database across multiple machines.
  216. //
  217. // Note: This transaction does not use the underlying session pool but creates a
  218. // new session each time, and the session is reused across clients.
  219. //
  220. // You should call Close() after the txn is no longer needed on local
  221. // client, and call Cleanup() when the txn is finished for all clients, to free
  222. // the session.
  223. func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
  224. var (
  225. tx transactionID
  226. rts time.Time
  227. s *session
  228. sh *sessionHandle
  229. err error
  230. )
  231. defer func() {
  232. if err != nil && sh != nil {
  233. s.delete(ctx)
  234. }
  235. }()
  236. // create session
  237. sc := c.rrNext()
  238. s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md)
  239. if err != nil {
  240. return nil, err
  241. }
  242. sh = &sessionHandle{session: s}
  243. // begin transaction
  244. err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
  245. res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
  246. Session: sh.getID(),
  247. Options: &sppb.TransactionOptions{
  248. Mode: &sppb.TransactionOptions_ReadOnly_{
  249. ReadOnly: buildTransactionOptionsReadOnly(tb, true),
  250. },
  251. },
  252. })
  253. if e != nil {
  254. return e
  255. }
  256. tx = res.Id
  257. if res.ReadTimestamp != nil {
  258. rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
  259. }
  260. return nil
  261. })
  262. if err != nil {
  263. return nil, err
  264. }
  265. t := &BatchReadOnlyTransaction{
  266. ReadOnlyTransaction: ReadOnlyTransaction{
  267. tx: tx,
  268. txReadyOrClosed: make(chan struct{}),
  269. state: txActive,
  270. sh: sh,
  271. rts: rts,
  272. },
  273. ID: BatchReadOnlyTransactionID{
  274. tid: tx,
  275. sid: sh.getID(),
  276. rts: rts,
  277. },
  278. }
  279. t.txReadOnly.txReadEnv = t
  280. return t, nil
  281. }
  282. // BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from BatchReadOnlyTransactionID
  283. func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
  284. sc := c.rrNext()
  285. s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md}
  286. sh := &sessionHandle{session: s}
  287. t := &BatchReadOnlyTransaction{
  288. ReadOnlyTransaction: ReadOnlyTransaction{
  289. tx: tid.tid,
  290. txReadyOrClosed: make(chan struct{}),
  291. state: txActive,
  292. sh: sh,
  293. rts: tid.rts,
  294. },
  295. ID: tid,
  296. }
  297. t.txReadOnly.txReadEnv = t
  298. return t
  299. }
  300. type transactionInProgressKey struct{}
  301. func checkNestedTxn(ctx context.Context) error {
  302. if ctx.Value(transactionInProgressKey{}) != nil {
  303. return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
  304. }
  305. return nil
  306. }
  307. // ReadWriteTransaction executes a read-write transaction, with retries as
  308. // necessary.
  309. //
  310. // The function f will be called one or more times. It must not maintain
  311. // any state between calls.
  312. //
  313. // If the transaction cannot be committed or if f returns an ABORTED error,
  314. // ReadWriteTransaction will call f again. It will continue to call f until the
  315. // transaction can be committed or the Context times out or is cancelled. If f
  316. // returns an error other than ABORTED, ReadWriteTransaction will abort the
  317. // transaction and return the error.
  318. //
  319. // To limit the number of retries, set a deadline on the Context rather than
  320. // using a fixed limit on the number of attempts. ReadWriteTransaction will
  321. // retry as needed until that deadline is met.
  322. //
  323. // See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
  324. // more details.
  325. func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
  326. ctx = startSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
  327. defer func() { endSpan(ctx, err) }()
  328. if err := checkNestedTxn(ctx); err != nil {
  329. return time.Time{}, err
  330. }
  331. var (
  332. ts time.Time
  333. sh *sessionHandle
  334. )
  335. err = runRetryableNoWrap(ctx, func(ctx context.Context) error {
  336. var (
  337. err error
  338. t *ReadWriteTransaction
  339. )
  340. if sh == nil || sh.getID() == "" || sh.getClient() == nil {
  341. // Session handle hasn't been allocated or has been destroyed.
  342. sh, err = c.idleSessions.takeWriteSession(ctx)
  343. if err != nil {
  344. // If session retrieval fails, just fail the transaction.
  345. return err
  346. }
  347. t = &ReadWriteTransaction{
  348. sh: sh,
  349. tx: sh.getTransactionID(),
  350. }
  351. } else {
  352. t = &ReadWriteTransaction{
  353. sh: sh,
  354. }
  355. }
  356. t.txReadOnly.txReadEnv = t
  357. statsPrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
  358. "Starting transaction attempt")
  359. if err = t.begin(ctx); err != nil {
  360. // Mask error from begin operation as retryable error.
  361. return errRetry(err)
  362. }
  363. ts, err = t.runInTransaction(ctx, f)
  364. return err
  365. })
  366. if sh != nil {
  367. sh.recycle()
  368. }
  369. return ts, err
  370. }
  371. // applyOption controls the behavior of Client.Apply.
  372. type applyOption struct {
  373. // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud Spanner at least once.
  374. atLeastOnce bool
  375. }
  376. // An ApplyOption is an optional argument to Apply.
  377. type ApplyOption func(*applyOption)
  378. // ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
  379. //
  380. // With this option, Apply may attempt to apply mutations more than once; if
  381. // the mutations are not idempotent, this may lead to a failure being reported
  382. // when the mutation was applied more than once. For example, an insert may
  383. // fail with ALREADY_EXISTS even though the row did not exist before Apply was
  384. // called. For this reason, most users of the library will prefer not to use
  385. // this option. However, ApplyAtLeastOnce requires only a single RPC, whereas
  386. // Apply's default replay protection may require an additional RPC. So this
  387. // option may be appropriate for latency sensitive and/or high throughput blind
  388. // writing.
  389. func ApplyAtLeastOnce() ApplyOption {
  390. return func(ao *applyOption) {
  391. ao.atLeastOnce = true
  392. }
  393. }
  394. // Apply applies a list of mutations atomically to the database.
  395. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
  396. ao := &applyOption{}
  397. for _, opt := range opts {
  398. opt(ao)
  399. }
  400. if !ao.atLeastOnce {
  401. return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
  402. return t.BufferWrite(ms)
  403. })
  404. }
  405. ctx = startSpan(ctx, "cloud.google.com/go/spanner.Apply")
  406. defer func() { endSpan(ctx, err) }()
  407. t := &writeOnlyTransaction{c.idleSessions}
  408. return t.applyAtLeastOnce(ctx, ms...)
  409. }