You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

454 lines
14 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package spanner
  14. import (
  15. "fmt"
  16. "log"
  17. "regexp"
  18. "sync/atomic"
  19. "time"
  20. "cloud.google.com/go/internal/version"
  21. "golang.org/x/net/context"
  22. "google.golang.org/api/option"
  23. gtransport "google.golang.org/api/transport/grpc"
  24. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  25. "google.golang.org/grpc"
  26. "google.golang.org/grpc/codes"
  27. "google.golang.org/grpc/metadata"
  28. )
  29. const (
  30. endpoint = "spanner.googleapis.com:443"
  31. // resourcePrefixHeader is the name of the metadata header used to indicate
  32. // the resource being operated on.
  33. resourcePrefixHeader = "google-cloud-resource-prefix"
  34. // xGoogHeaderKey is the name of the metadata header used to indicate client
  35. // information.
  36. xGoogHeaderKey = "x-goog-api-client"
  37. )
  38. const (
  39. // Scope is the scope for Cloud Spanner Data API.
  40. Scope = "https://www.googleapis.com/auth/spanner.data"
  41. // AdminScope is the scope for Cloud Spanner Admin APIs.
  42. AdminScope = "https://www.googleapis.com/auth/spanner.admin"
  43. )
  44. var (
  45. validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
  46. xGoogHeaderVal = fmt.Sprintf("gl-go/%s gccl/%s grpc/%s", version.Go(), version.Repo, grpc.Version)
  47. )
  48. func validDatabaseName(db string) error {
  49. if matched := validDBPattern.MatchString(db); !matched {
  50. return fmt.Errorf("database name %q should conform to pattern %q",
  51. db, validDBPattern.String())
  52. }
  53. return nil
  54. }
  55. // Client is a client for reading and writing data to a Cloud Spanner database. A
  56. // client is safe to use concurrently, except for its Close method.
  57. type Client struct {
  58. // rr must be accessed through atomic operations.
  59. rr uint32
  60. conns []*grpc.ClientConn
  61. clients []sppb.SpannerClient
  62. database string
  63. // Metadata to be sent with each request.
  64. md metadata.MD
  65. idleSessions *sessionPool
  66. // sessionLabels for the sessions created by this client.
  67. sessionLabels map[string]string
  68. }
  69. // ClientConfig has configurations for the client.
  70. type ClientConfig struct {
  71. // NumChannels is the number of gRPC channels.
  72. // If zero, a reasonable default is used based on the execution environment.
  73. NumChannels int
  74. co []option.ClientOption
  75. // SessionPoolConfig is the configuration for session pool.
  76. SessionPoolConfig
  77. // SessionLabels for the sessions created by this client.
  78. // See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session for more info.
  79. SessionLabels map[string]string
  80. }
  81. // errDial returns error for dialing to Cloud Spanner.
  82. func errDial(ci int, err error) error {
  83. e := toSpannerError(err).(*Error)
  84. e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
  85. return e
  86. }
  87. func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
  88. existing, ok := metadata.FromOutgoingContext(ctx)
  89. if ok {
  90. md = metadata.Join(existing, md)
  91. }
  92. return metadata.NewOutgoingContext(ctx, md)
  93. }
  94. // NewClient creates a client to a database. A valid database name has the
  95. // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses a default
  96. // configuration.
  97. func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
  98. return NewClientWithConfig(ctx, database, ClientConfig{}, opts...)
  99. }
  100. // NewClientWithConfig creates a client to a database. A valid database name has the
  101. // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
  102. func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
  103. ctx = traceStartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
  104. defer func() { traceEndSpan(ctx, err) }()
  105. // Validate database path.
  106. if err := validDatabaseName(database); err != nil {
  107. return nil, err
  108. }
  109. c = &Client{
  110. database: database,
  111. md: metadata.Pairs(
  112. resourcePrefixHeader, database,
  113. xGoogHeaderKey, xGoogHeaderVal),
  114. }
  115. // Make a copy of labels.
  116. c.sessionLabels = make(map[string]string)
  117. for k, v := range config.SessionLabels {
  118. c.sessionLabels[k] = v
  119. }
  120. // gRPC options
  121. allOpts := []option.ClientOption{
  122. option.WithEndpoint(endpoint),
  123. option.WithScopes(Scope),
  124. option.WithGRPCDialOption(
  125. grpc.WithDefaultCallOptions(
  126. grpc.MaxCallSendMsgSize(100<<20),
  127. grpc.MaxCallRecvMsgSize(100<<20),
  128. ),
  129. ),
  130. }
  131. allOpts = append(allOpts, opts...)
  132. // Prepare gRPC channels.
  133. if config.NumChannels == 0 {
  134. config.NumChannels = numChannels
  135. }
  136. // Default configs for session pool.
  137. if config.MaxOpened == 0 {
  138. config.MaxOpened = uint64(config.NumChannels * 100)
  139. }
  140. if config.MaxBurst == 0 {
  141. config.MaxBurst = 10
  142. }
  143. for i := 0; i < config.NumChannels; i++ {
  144. conn, err := gtransport.Dial(ctx, allOpts...)
  145. if err != nil {
  146. return nil, errDial(i, err)
  147. }
  148. c.conns = append(c.conns, conn)
  149. c.clients = append(c.clients, sppb.NewSpannerClient(conn))
  150. }
  151. // Prepare session pool.
  152. config.SessionPoolConfig.getRPCClient = func() (sppb.SpannerClient, error) {
  153. // TODO: support more loadbalancing options.
  154. return c.rrNext(), nil
  155. }
  156. config.SessionPoolConfig.sessionLabels = c.sessionLabels
  157. sp, err := newSessionPool(database, config.SessionPoolConfig, c.md)
  158. if err != nil {
  159. c.Close()
  160. return nil, err
  161. }
  162. c.idleSessions = sp
  163. return c, nil
  164. }
  165. // rrNext returns the next available Cloud Spanner RPC client in a round-robin manner.
  166. func (c *Client) rrNext() sppb.SpannerClient {
  167. return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))]
  168. }
  169. // Close closes the client.
  170. func (c *Client) Close() {
  171. if c.idleSessions != nil {
  172. c.idleSessions.close()
  173. }
  174. for _, conn := range c.conns {
  175. conn.Close()
  176. }
  177. }
  178. // Single provides a read-only snapshot transaction optimized for the case
  179. // where only a single read or query is needed. This is more efficient than
  180. // using ReadOnlyTransaction() for a single read or query.
  181. //
  182. // Single will use a strong TimestampBound by default. Use
  183. // ReadOnlyTransaction.WithTimestampBound to specify a different
  184. // TimestampBound. A non-strong bound can be used to reduce latency, or
  185. // "time-travel" to prior versions of the database, see the documentation of
  186. // TimestampBound for details.
  187. func (c *Client) Single() *ReadOnlyTransaction {
  188. t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions}
  189. t.txReadOnly.txReadEnv = t
  190. return t
  191. }
  192. // ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
  193. // multiple reads from the database. You must call Close() when the
  194. // ReadOnlyTransaction is no longer needed to release resources on the server.
  195. //
  196. // ReadOnlyTransaction will use a strong TimestampBound by default. Use
  197. // ReadOnlyTransaction.WithTimestampBound to specify a different
  198. // TimestampBound. A non-strong bound can be used to reduce latency, or
  199. // "time-travel" to prior versions of the database, see the documentation of
  200. // TimestampBound for details.
  201. func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
  202. t := &ReadOnlyTransaction{
  203. singleUse: false,
  204. sp: c.idleSessions,
  205. txReadyOrClosed: make(chan struct{}),
  206. }
  207. t.txReadOnly.txReadEnv = t
  208. return t
  209. }
  210. // BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
  211. // for partitioned reads or queries from a snapshot of the database. This is
  212. // useful in batch processing pipelines where one wants to divide the work of
  213. // reading from the database across multiple machines.
  214. //
  215. // Note: This transaction does not use the underlying session pool but creates a
  216. // new session each time, and the session is reused across clients.
  217. //
  218. // You should call Close() after the txn is no longer needed on local
  219. // client, and call Cleanup() when the txn is finished for all clients, to free
  220. // the session.
  221. func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
  222. var (
  223. tx transactionID
  224. rts time.Time
  225. s *session
  226. sh *sessionHandle
  227. err error
  228. )
  229. defer func() {
  230. if err != nil && sh != nil {
  231. e := runRetryable(ctx, func(ctx context.Context) error {
  232. _, e := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()})
  233. return e
  234. })
  235. if e != nil {
  236. log.Printf("Failed to delete session %v. Error: %v", s.getID(), e)
  237. }
  238. }
  239. }()
  240. // create session
  241. sc := c.rrNext()
  242. err = runRetryable(ctx, func(ctx context.Context) error {
  243. sid, e := sc.CreateSession(ctx, &sppb.CreateSessionRequest{Database: c.database, Session: &sppb.Session{Labels: c.sessionLabels}})
  244. if e != nil {
  245. return e
  246. }
  247. // If no error, construct the new session.
  248. s = &session{valid: true, client: sc, id: sid.Name, createTime: time.Now(), md: c.md}
  249. return nil
  250. })
  251. if err != nil {
  252. return nil, err
  253. }
  254. sh = &sessionHandle{session: s}
  255. // begin transaction
  256. err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
  257. res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
  258. Session: sh.getID(),
  259. Options: &sppb.TransactionOptions{
  260. Mode: &sppb.TransactionOptions_ReadOnly_{
  261. ReadOnly: buildTransactionOptionsReadOnly(tb, true),
  262. },
  263. },
  264. })
  265. if e != nil {
  266. return e
  267. }
  268. tx = res.Id
  269. if res.ReadTimestamp != nil {
  270. rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
  271. }
  272. return nil
  273. })
  274. if err != nil {
  275. return nil, err
  276. }
  277. t := &BatchReadOnlyTransaction{
  278. ReadOnlyTransaction: ReadOnlyTransaction{
  279. tx: tx,
  280. txReadyOrClosed: make(chan struct{}),
  281. state: txActive,
  282. sh: sh,
  283. rts: rts,
  284. },
  285. ID: BatchReadOnlyTransactionID{
  286. tid: tx,
  287. sid: sh.getID(),
  288. rts: rts,
  289. },
  290. }
  291. t.txReadOnly.txReadEnv = t
  292. return t, nil
  293. }
  294. // BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from BatchReadOnlyTransactionID
  295. func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
  296. sc := c.rrNext()
  297. s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md}
  298. sh := &sessionHandle{session: s}
  299. t := &BatchReadOnlyTransaction{
  300. ReadOnlyTransaction: ReadOnlyTransaction{
  301. tx: tid.tid,
  302. txReadyOrClosed: make(chan struct{}),
  303. state: txActive,
  304. sh: sh,
  305. rts: tid.rts,
  306. },
  307. ID: tid,
  308. }
  309. t.txReadOnly.txReadEnv = t
  310. return t
  311. }
  312. type transactionInProgressKey struct{}
  313. func checkNestedTxn(ctx context.Context) error {
  314. if ctx.Value(transactionInProgressKey{}) != nil {
  315. return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
  316. }
  317. return nil
  318. }
  319. // ReadWriteTransaction executes a read-write transaction, with retries as
  320. // necessary.
  321. //
  322. // The function f will be called one or more times. It must not maintain
  323. // any state between calls.
  324. //
  325. // If the transaction cannot be committed or if f returns an IsAborted error,
  326. // ReadWriteTransaction will call f again. It will continue to call f until the
  327. // transaction can be committed or the Context times out or is cancelled. If f
  328. // returns an error other than IsAborted, ReadWriteTransaction will abort the
  329. // transaction and return the error.
  330. //
  331. // To limit the number of retries, set a deadline on the Context rather than
  332. // using a fixed limit on the number of attempts. ReadWriteTransaction will
  333. // retry as needed until that deadline is met.
  334. func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
  335. ctx = traceStartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
  336. defer func() { traceEndSpan(ctx, err) }()
  337. if err := checkNestedTxn(ctx); err != nil {
  338. return time.Time{}, err
  339. }
  340. var (
  341. ts time.Time
  342. sh *sessionHandle
  343. )
  344. err = runRetryableNoWrap(ctx, func(ctx context.Context) error {
  345. var (
  346. err error
  347. t *ReadWriteTransaction
  348. )
  349. if sh == nil || sh.getID() == "" || sh.getClient() == nil {
  350. // Session handle hasn't been allocated or has been destroyed.
  351. sh, err = c.idleSessions.takeWriteSession(ctx)
  352. if err != nil {
  353. // If session retrieval fails, just fail the transaction.
  354. return err
  355. }
  356. t = &ReadWriteTransaction{
  357. sh: sh,
  358. tx: sh.getTransactionID(),
  359. }
  360. } else {
  361. t = &ReadWriteTransaction{
  362. sh: sh,
  363. }
  364. }
  365. t.txReadOnly.txReadEnv = t
  366. tracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
  367. "Starting transaction attempt")
  368. if err = t.begin(ctx); err != nil {
  369. // Mask error from begin operation as retryable error.
  370. return errRetry(err)
  371. }
  372. ts, err = t.runInTransaction(ctx, f)
  373. return err
  374. })
  375. if sh != nil {
  376. sh.recycle()
  377. }
  378. return ts, err
  379. }
  380. // applyOption controls the behavior of Client.Apply.
  381. type applyOption struct {
  382. // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud Spanner at least once.
  383. atLeastOnce bool
  384. }
  385. // An ApplyOption is an optional argument to Apply.
  386. type ApplyOption func(*applyOption)
  387. // ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
  388. //
  389. // With this option, Apply may attempt to apply mutations more than once; if
  390. // the mutations are not idempotent, this may lead to a failure being reported
  391. // when the mutation was applied more than once. For example, an insert may
  392. // fail with ALREADY_EXISTS even though the row did not exist before Apply was
  393. // called. For this reason, most users of the library will prefer not to use
  394. // this option. However, ApplyAtLeastOnce requires only a single RPC, whereas
  395. // Apply's default replay protection may require an additional RPC. So this
  396. // option may be appropriate for latency sensitive and/or high throughput blind
  397. // writing.
  398. func ApplyAtLeastOnce() ApplyOption {
  399. return func(ao *applyOption) {
  400. ao.atLeastOnce = true
  401. }
  402. }
  403. // Apply applies a list of mutations atomically to the database.
  404. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
  405. ao := &applyOption{}
  406. for _, opt := range opts {
  407. opt(ao)
  408. }
  409. if !ao.atLeastOnce {
  410. return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
  411. return t.BufferWrite(ms)
  412. })
  413. }
  414. ctx = traceStartSpan(ctx, "cloud.google.com/go/spanner.Apply")
  415. defer func() { traceEndSpan(ctx, err) }()
  416. t := &writeOnlyTransaction{c.idleSessions}
  417. return t.applyAtLeastOnce(ctx, ms...)
  418. }