You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

915 lines
27 KiB

  1. /*
  2. Copyright 2015 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package bigtable // import "cloud.google.com/go/bigtable"
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "strconv"
  20. "time"
  21. "cloud.google.com/go/bigtable/internal/gax"
  22. btopt "cloud.google.com/go/bigtable/internal/option"
  23. "github.com/golang/protobuf/proto"
  24. "google.golang.org/api/option"
  25. gtransport "google.golang.org/api/transport/grpc"
  26. btpb "google.golang.org/genproto/googleapis/bigtable/v2"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/metadata"
  30. "google.golang.org/grpc/status"
  31. )
  32. const prodAddr = "bigtable.googleapis.com:443"
  33. // Client is a client for reading and writing data to tables in an instance.
  34. //
  35. // A Client is safe to use concurrently, except for its Close method.
  36. type Client struct {
  37. conn *grpc.ClientConn
  38. client btpb.BigtableClient
  39. project, instance string
  40. appProfile string
  41. }
  42. // ClientConfig has configurations for the client.
  43. type ClientConfig struct {
  44. // The id of the app profile to associate with all data operations sent from this client.
  45. // If unspecified, the default app profile for the instance will be used.
  46. AppProfile string
  47. }
  48. // NewClient creates a new Client for a given project and instance.
  49. // The default ClientConfig will be used.
  50. func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) {
  51. return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...)
  52. }
  53. // NewClientWithConfig creates a new client with the given config.
  54. func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
  55. o, err := btopt.DefaultClientOptions(prodAddr, Scope, clientUserAgent)
  56. if err != nil {
  57. return nil, err
  58. }
  59. // Default to a small connection pool that can be overridden.
  60. o = append(o,
  61. option.WithGRPCConnectionPool(4),
  62. // Set the max size to correspond to server-side limits.
  63. option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))),
  64. // TODO(grpc/grpc-go#1388) using connection pool without WithBlock
  65. // can cause RPCs to fail randomly. We can delete this after the issue is fixed.
  66. option.WithGRPCDialOption(grpc.WithBlock()))
  67. o = append(o, opts...)
  68. conn, err := gtransport.Dial(ctx, o...)
  69. if err != nil {
  70. return nil, fmt.Errorf("dialing: %v", err)
  71. }
  72. return &Client{
  73. conn: conn,
  74. client: btpb.NewBigtableClient(conn),
  75. project: project,
  76. instance: instance,
  77. appProfile: config.AppProfile,
  78. }, nil
  79. }
  80. // Close closes the Client.
  81. func (c *Client) Close() error {
  82. return c.conn.Close()
  83. }
  84. var (
  85. idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
  86. isIdempotentRetryCode = make(map[codes.Code]bool)
  87. retryOptions = []gax.CallOption{
  88. gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2),
  89. gax.WithRetryCodes(idempotentRetryCodes),
  90. }
  91. )
  92. func init() {
  93. for _, code := range idempotentRetryCodes {
  94. isIdempotentRetryCode[code] = true
  95. }
  96. }
  97. func (c *Client) fullTableName(table string) string {
  98. return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table)
  99. }
  100. // A Table refers to a table.
  101. //
  102. // A Table is safe to use concurrently.
  103. type Table struct {
  104. c *Client
  105. table string
  106. // Metadata to be sent with each request.
  107. md metadata.MD
  108. }
  109. // Open opens a table.
  110. func (c *Client) Open(table string) *Table {
  111. return &Table{
  112. c: c,
  113. table: table,
  114. md: metadata.Pairs(resourcePrefixHeader, c.fullTableName(table)),
  115. }
  116. }
  117. // TODO(dsymonds): Read method that returns a sequence of ReadItems.
  118. // ReadRows reads rows from a table. f is called for each row.
  119. // If f returns false, the stream is shut down and ReadRows returns.
  120. // f owns its argument, and f is called serially in order by row key.
  121. //
  122. // By default, the yielded rows will contain all values in all cells.
  123. // Use RowFilter to limit the cells returned.
  124. func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
  125. ctx = mergeOutgoingMetadata(ctx, t.md)
  126. var prevRowKey string
  127. var err error
  128. ctx = traceStartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
  129. defer func() { traceEndSpan(ctx, err) }()
  130. attrMap := make(map[string]interface{})
  131. err = gax.Invoke(ctx, func(ctx context.Context) error {
  132. if !arg.valid() {
  133. // Empty row set, no need to make an API call.
  134. // NOTE: we must return early if arg == RowList{} because reading
  135. // an empty RowList from bigtable returns all rows from that table.
  136. return nil
  137. }
  138. req := &btpb.ReadRowsRequest{
  139. TableName: t.c.fullTableName(t.table),
  140. AppProfileId: t.c.appProfile,
  141. Rows: arg.proto(),
  142. }
  143. for _, opt := range opts {
  144. opt.set(req)
  145. }
  146. ctx, cancel := context.WithCancel(ctx) // for aborting the stream
  147. defer cancel()
  148. startTime := time.Now()
  149. stream, err := t.c.client.ReadRows(ctx, req)
  150. if err != nil {
  151. return err
  152. }
  153. cr := newChunkReader()
  154. for {
  155. res, err := stream.Recv()
  156. if err == io.EOF {
  157. break
  158. }
  159. if err != nil {
  160. // Reset arg for next Invoke call.
  161. arg = arg.retainRowsAfter(prevRowKey)
  162. attrMap["rowKey"] = prevRowKey
  163. attrMap["error"] = err.Error()
  164. attrMap["time_secs"] = time.Since(startTime).Seconds()
  165. tracePrintf(ctx, attrMap, "Retry details in ReadRows")
  166. return err
  167. }
  168. attrMap["time_secs"] = time.Since(startTime).Seconds()
  169. attrMap["rowCount"] = len(res.Chunks)
  170. tracePrintf(ctx, attrMap, "Details in ReadRows")
  171. for _, cc := range res.Chunks {
  172. row, err := cr.Process(cc)
  173. if err != nil {
  174. // No need to prepare for a retry, this is an unretryable error.
  175. return err
  176. }
  177. if row == nil {
  178. continue
  179. }
  180. prevRowKey = row.Key()
  181. if !f(row) {
  182. // Cancel and drain stream.
  183. cancel()
  184. for {
  185. if _, err := stream.Recv(); err != nil {
  186. // The stream has ended. We don't return an error
  187. // because the caller has intentionally interrupted the scan.
  188. return nil
  189. }
  190. }
  191. }
  192. }
  193. if err := cr.Close(); err != nil {
  194. // No need to prepare for a retry, this is an unretryable error.
  195. return err
  196. }
  197. }
  198. return err
  199. }, retryOptions...)
  200. return err
  201. }
  202. // ReadRow is a convenience implementation of a single-row reader.
  203. // A missing row will return a zero-length map and a nil error.
  204. func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
  205. var r Row
  206. err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool {
  207. r = rr
  208. return true
  209. }, opts...)
  210. return r, err
  211. }
  212. // decodeFamilyProto adds the cell data from f to the given row.
  213. func decodeFamilyProto(r Row, row string, f *btpb.Family) {
  214. fam := f.Name // does not have colon
  215. for _, col := range f.Columns {
  216. for _, cell := range col.Cells {
  217. ri := ReadItem{
  218. Row: row,
  219. Column: fam + ":" + string(col.Qualifier),
  220. Timestamp: Timestamp(cell.TimestampMicros),
  221. Value: cell.Value,
  222. }
  223. r[fam] = append(r[fam], ri)
  224. }
  225. }
  226. }
  227. // RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList.
  228. // The serialized size of the RowSet must be no larger than 1MiB.
  229. type RowSet interface {
  230. proto() *btpb.RowSet
  231. // retainRowsAfter returns a new RowSet that does not include the
  232. // given row key or any row key lexicographically less than it.
  233. retainRowsAfter(lastRowKey string) RowSet
  234. // Valid reports whether this set can cover at least one row.
  235. valid() bool
  236. }
  237. // RowList is a sequence of row keys.
  238. type RowList []string
  239. func (r RowList) proto() *btpb.RowSet {
  240. keys := make([][]byte, len(r))
  241. for i, row := range r {
  242. keys[i] = []byte(row)
  243. }
  244. return &btpb.RowSet{RowKeys: keys}
  245. }
  246. func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
  247. var retryKeys RowList
  248. for _, key := range r {
  249. if key > lastRowKey {
  250. retryKeys = append(retryKeys, key)
  251. }
  252. }
  253. return retryKeys
  254. }
  255. func (r RowList) valid() bool {
  256. return len(r) > 0
  257. }
  258. // A RowRange is a half-open interval [Start, Limit) encompassing
  259. // all the rows with keys at least as large as Start, and less than Limit.
  260. // (Bigtable string comparison is the same as Go's.)
  261. // A RowRange can be unbounded, encompassing all keys at least as large as Start.
  262. type RowRange struct {
  263. start string
  264. limit string
  265. }
  266. // NewRange returns the new RowRange [begin, end).
  267. func NewRange(begin, end string) RowRange {
  268. return RowRange{
  269. start: begin,
  270. limit: end,
  271. }
  272. }
  273. // Unbounded tests whether a RowRange is unbounded.
  274. func (r RowRange) Unbounded() bool {
  275. return r.limit == ""
  276. }
  277. // Contains says whether the RowRange contains the key.
  278. func (r RowRange) Contains(row string) bool {
  279. return r.start <= row && (r.limit == "" || r.limit > row)
  280. }
  281. // String provides a printable description of a RowRange.
  282. func (r RowRange) String() string {
  283. a := strconv.Quote(r.start)
  284. if r.Unbounded() {
  285. return fmt.Sprintf("[%s,∞)", a)
  286. }
  287. return fmt.Sprintf("[%s,%q)", a, r.limit)
  288. }
  289. func (r RowRange) proto() *btpb.RowSet {
  290. rr := &btpb.RowRange{
  291. StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
  292. }
  293. if !r.Unbounded() {
  294. rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
  295. }
  296. return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
  297. }
  298. func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
  299. if lastRowKey == "" || lastRowKey < r.start {
  300. return r
  301. }
  302. // Set the beginning of the range to the row after the last scanned.
  303. start := lastRowKey + "\x00"
  304. if r.Unbounded() {
  305. return InfiniteRange(start)
  306. }
  307. return NewRange(start, r.limit)
  308. }
  309. func (r RowRange) valid() bool {
  310. return r.Unbounded() || r.start < r.limit
  311. }
  312. // RowRangeList is a sequence of RowRanges representing the union of the ranges.
  313. type RowRangeList []RowRange
  314. func (r RowRangeList) proto() *btpb.RowSet {
  315. ranges := make([]*btpb.RowRange, len(r))
  316. for i, rr := range r {
  317. // RowRange.proto() returns a RowSet with a single element RowRange array
  318. ranges[i] = rr.proto().RowRanges[0]
  319. }
  320. return &btpb.RowSet{RowRanges: ranges}
  321. }
  322. func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
  323. if lastRowKey == "" {
  324. return r
  325. }
  326. // Return a list of any range that has not yet been completely processed
  327. var ranges RowRangeList
  328. for _, rr := range r {
  329. retained := rr.retainRowsAfter(lastRowKey)
  330. if retained.valid() {
  331. ranges = append(ranges, retained.(RowRange))
  332. }
  333. }
  334. return ranges
  335. }
  336. func (r RowRangeList) valid() bool {
  337. for _, rr := range r {
  338. if rr.valid() {
  339. return true
  340. }
  341. }
  342. return false
  343. }
  344. // SingleRow returns a RowSet for reading a single row.
  345. func SingleRow(row string) RowSet {
  346. return RowList{row}
  347. }
  348. // PrefixRange returns a RowRange consisting of all keys starting with the prefix.
  349. func PrefixRange(prefix string) RowRange {
  350. return RowRange{
  351. start: prefix,
  352. limit: prefixSuccessor(prefix),
  353. }
  354. }
  355. // InfiniteRange returns the RowRange consisting of all keys at least as
  356. // large as start.
  357. func InfiniteRange(start string) RowRange {
  358. return RowRange{
  359. start: start,
  360. limit: "",
  361. }
  362. }
  363. // prefixSuccessor returns the lexically smallest string greater than the
  364. // prefix, if it exists, or "" otherwise. In either case, it is the string
  365. // needed for the Limit of a RowRange.
  366. func prefixSuccessor(prefix string) string {
  367. if prefix == "" {
  368. return "" // infinite range
  369. }
  370. n := len(prefix)
  371. for n--; n >= 0 && prefix[n] == '\xff'; n-- {
  372. }
  373. if n == -1 {
  374. return ""
  375. }
  376. ans := []byte(prefix[:n])
  377. ans = append(ans, prefix[n]+1)
  378. return string(ans)
  379. }
  380. // A ReadOption is an optional argument to ReadRows.
  381. type ReadOption interface {
  382. set(req *btpb.ReadRowsRequest)
  383. }
  384. // RowFilter returns a ReadOption that applies f to the contents of read rows.
  385. //
  386. // If multiple RowFilters are provided, only the last is used. To combine filters,
  387. // use ChainFilters or InterleaveFilters instead.
  388. func RowFilter(f Filter) ReadOption { return rowFilter{f} }
  389. type rowFilter struct{ f Filter }
  390. func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() }
  391. // LimitRows returns a ReadOption that will limit the number of rows to be read.
  392. func LimitRows(limit int64) ReadOption { return limitRows{limit} }
  393. type limitRows struct{ limit int64 }
  394. func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit }
  395. // mutationsAreRetryable returns true if all mutations are idempotent
  396. // and therefore retryable. A mutation is idempotent iff all cell timestamps
  397. // have an explicit timestamp set and do not rely on the timestamp being set on the server.
  398. func mutationsAreRetryable(muts []*btpb.Mutation) bool {
  399. serverTime := int64(ServerTime)
  400. for _, mut := range muts {
  401. setCell := mut.GetSetCell()
  402. if setCell != nil && setCell.TimestampMicros == serverTime {
  403. return false
  404. }
  405. }
  406. return true
  407. }
  408. const maxMutations = 100000
  409. // Apply mutates a row atomically. A mutation must contain at least one
  410. // operation and at most 100000 operations.
  411. func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
  412. ctx = mergeOutgoingMetadata(ctx, t.md)
  413. after := func(res proto.Message) {
  414. for _, o := range opts {
  415. o.after(res)
  416. }
  417. }
  418. var err error
  419. ctx = traceStartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
  420. defer func() { traceEndSpan(ctx, err) }()
  421. var callOptions []gax.CallOption
  422. if m.cond == nil {
  423. req := &btpb.MutateRowRequest{
  424. TableName: t.c.fullTableName(t.table),
  425. AppProfileId: t.c.appProfile,
  426. RowKey: []byte(row),
  427. Mutations: m.ops,
  428. }
  429. if mutationsAreRetryable(m.ops) {
  430. callOptions = retryOptions
  431. }
  432. var res *btpb.MutateRowResponse
  433. err := gax.Invoke(ctx, func(ctx context.Context) error {
  434. var err error
  435. res, err = t.c.client.MutateRow(ctx, req)
  436. return err
  437. }, callOptions...)
  438. if err == nil {
  439. after(res)
  440. }
  441. return err
  442. }
  443. req := &btpb.CheckAndMutateRowRequest{
  444. TableName: t.c.fullTableName(t.table),
  445. AppProfileId: t.c.appProfile,
  446. RowKey: []byte(row),
  447. PredicateFilter: m.cond.proto(),
  448. }
  449. if m.mtrue != nil {
  450. if m.mtrue.cond != nil {
  451. return errors.New("bigtable: conditional mutations cannot be nested")
  452. }
  453. req.TrueMutations = m.mtrue.ops
  454. }
  455. if m.mfalse != nil {
  456. if m.mfalse.cond != nil {
  457. return errors.New("bigtable: conditional mutations cannot be nested")
  458. }
  459. req.FalseMutations = m.mfalse.ops
  460. }
  461. if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) {
  462. callOptions = retryOptions
  463. }
  464. var cmRes *btpb.CheckAndMutateRowResponse
  465. err = gax.Invoke(ctx, func(ctx context.Context) error {
  466. var err error
  467. cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
  468. return err
  469. }, callOptions...)
  470. if err == nil {
  471. after(cmRes)
  472. }
  473. return err
  474. }
  475. // An ApplyOption is an optional argument to Apply.
  476. type ApplyOption interface {
  477. after(res proto.Message)
  478. }
  479. type applyAfterFunc func(res proto.Message)
  480. func (a applyAfterFunc) after(res proto.Message) { a(res) }
  481. // GetCondMutationResult returns an ApplyOption that reports whether the conditional
  482. // mutation's condition matched.
  483. func GetCondMutationResult(matched *bool) ApplyOption {
  484. return applyAfterFunc(func(res proto.Message) {
  485. if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok {
  486. *matched = res.PredicateMatched
  487. }
  488. })
  489. }
  490. // Mutation represents a set of changes for a single row of a table.
  491. type Mutation struct {
  492. ops []*btpb.Mutation
  493. // for conditional mutations
  494. cond Filter
  495. mtrue, mfalse *Mutation
  496. }
  497. // NewMutation returns a new mutation.
  498. func NewMutation() *Mutation {
  499. return new(Mutation)
  500. }
  501. // NewCondMutation returns a conditional mutation.
  502. // The given row filter determines which mutation is applied:
  503. // If the filter matches any cell in the row, mtrue is applied;
  504. // otherwise, mfalse is applied.
  505. // Either given mutation may be nil.
  506. func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation {
  507. return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse}
  508. }
  509. // Set sets a value in a specified column, with the given timestamp.
  510. // The timestamp will be truncated to millisecond granularity.
  511. // A timestamp of ServerTime means to use the server timestamp.
  512. func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) {
  513. m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
  514. FamilyName: family,
  515. ColumnQualifier: []byte(column),
  516. TimestampMicros: int64(ts.TruncateToMilliseconds()),
  517. Value: value,
  518. }}})
  519. }
  520. // DeleteCellsInColumn will delete all the cells whose columns are family:column.
  521. func (m *Mutation) DeleteCellsInColumn(family, column string) {
  522. m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
  523. FamilyName: family,
  524. ColumnQualifier: []byte(column),
  525. }}})
  526. }
  527. // DeleteTimestampRange deletes all cells whose columns are family:column
  528. // and whose timestamps are in the half-open interval [start, end).
  529. // If end is zero, it will be interpreted as infinity.
  530. // The timestamps will be truncated to millisecond granularity.
  531. func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) {
  532. m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
  533. FamilyName: family,
  534. ColumnQualifier: []byte(column),
  535. TimeRange: &btpb.TimestampRange{
  536. StartTimestampMicros: int64(start.TruncateToMilliseconds()),
  537. EndTimestampMicros: int64(end.TruncateToMilliseconds()),
  538. },
  539. }}})
  540. }
  541. // DeleteCellsInFamily will delete all the cells whose columns are family:*.
  542. func (m *Mutation) DeleteCellsInFamily(family string) {
  543. m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{
  544. FamilyName: family,
  545. }}})
  546. }
  547. // DeleteRow deletes the entire row.
  548. func (m *Mutation) DeleteRow() {
  549. m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}})
  550. }
  551. // entryErr is a container that combines an entry with the error that was returned for it.
  552. // Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
  553. type entryErr struct {
  554. Entry *btpb.MutateRowsRequest_Entry
  555. Err error
  556. }
  557. // ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
  558. // Each mutation is individually applied atomically,
  559. // but the set of mutations may be applied in any order.
  560. //
  561. // Two types of failures may occur. If the entire process
  562. // fails, (nil, err) will be returned. If specific mutations
  563. // fail to apply, ([]err, nil) will be returned, and the errors
  564. // will correspond to the relevant rowKeys/muts arguments.
  565. //
  566. // Conditional mutations cannot be applied in bulk and providing one will result in an error.
  567. func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) {
  568. ctx = mergeOutgoingMetadata(ctx, t.md)
  569. if len(rowKeys) != len(muts) {
  570. return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
  571. }
  572. origEntries := make([]*entryErr, len(rowKeys))
  573. for i, key := range rowKeys {
  574. mut := muts[i]
  575. if mut.cond != nil {
  576. return nil, errors.New("conditional mutations cannot be applied in bulk")
  577. }
  578. origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
  579. }
  580. var err error
  581. ctx = traceStartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
  582. defer func() { traceEndSpan(ctx, err) }()
  583. for _, group := range groupEntries(origEntries, maxMutations) {
  584. attrMap := make(map[string]interface{})
  585. err = gax.Invoke(ctx, func(ctx context.Context) error {
  586. attrMap["rowCount"] = len(group)
  587. tracePrintf(ctx, attrMap, "Row count in ApplyBulk")
  588. err := t.doApplyBulk(ctx, group, opts...)
  589. if err != nil {
  590. // We want to retry the entire request with the current group
  591. return err
  592. }
  593. group = t.getApplyBulkRetries(group)
  594. if len(group) > 0 && len(idempotentRetryCodes) > 0 {
  595. // We have at least one mutation that needs to be retried.
  596. // Return an arbitrary error that is retryable according to callOptions.
  597. return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
  598. }
  599. return nil
  600. }, retryOptions...)
  601. if err != nil {
  602. return nil, err
  603. }
  604. }
  605. // Accumulate all of the errors into an array to return, interspersed with nils for successful
  606. // entries. The absence of any errors means we should return nil.
  607. var errs []error
  608. var foundErr bool
  609. for _, entry := range origEntries {
  610. if entry.Err != nil {
  611. foundErr = true
  612. }
  613. errs = append(errs, entry.Err)
  614. }
  615. if foundErr {
  616. return errs, nil
  617. }
  618. return nil, nil
  619. }
  620. // getApplyBulkRetries returns the entries that need to be retried
  621. func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
  622. var retryEntries []*entryErr
  623. for _, entry := range entries {
  624. err := entry.Err
  625. if err != nil && isIdempotentRetryCode[grpc.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) {
  626. // There was an error and the entry is retryable.
  627. retryEntries = append(retryEntries, entry)
  628. }
  629. }
  630. return retryEntries
  631. }
  632. // doApplyBulk does the work of a single ApplyBulk invocation
  633. func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error {
  634. after := func(res proto.Message) {
  635. for _, o := range opts {
  636. o.after(res)
  637. }
  638. }
  639. entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
  640. for i, entryErr := range entryErrs {
  641. entries[i] = entryErr.Entry
  642. }
  643. req := &btpb.MutateRowsRequest{
  644. TableName: t.c.fullTableName(t.table),
  645. AppProfileId: t.c.appProfile,
  646. Entries: entries,
  647. }
  648. stream, err := t.c.client.MutateRows(ctx, req)
  649. if err != nil {
  650. return err
  651. }
  652. for {
  653. res, err := stream.Recv()
  654. if err == io.EOF {
  655. break
  656. }
  657. if err != nil {
  658. return err
  659. }
  660. for i, entry := range res.Entries {
  661. s := entry.Status
  662. if s.Code == int32(codes.OK) {
  663. entryErrs[i].Err = nil
  664. } else {
  665. entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message)
  666. }
  667. }
  668. after(res)
  669. }
  670. return nil
  671. }
  672. // groupEntries groups entries into groups of a specified size without breaking up
  673. // individual entries.
  674. func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
  675. var (
  676. res [][]*entryErr
  677. start int
  678. gmuts int
  679. )
  680. addGroup := func(end int) {
  681. if end-start > 0 {
  682. res = append(res, entries[start:end])
  683. start = end
  684. gmuts = 0
  685. }
  686. }
  687. for i, e := range entries {
  688. emuts := len(e.Entry.Mutations)
  689. if gmuts+emuts > maxSize {
  690. addGroup(i)
  691. }
  692. gmuts += emuts
  693. }
  694. addGroup(len(entries))
  695. return res
  696. }
  697. // Timestamp is in units of microseconds since 1 January 1970.
  698. type Timestamp int64
  699. // ServerTime is a specific Timestamp that may be passed to (*Mutation).Set.
  700. // It indicates that the server's timestamp should be used.
  701. const ServerTime Timestamp = -1
  702. // Time converts a time.Time into a Timestamp.
  703. func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) }
  704. // Now returns the Timestamp representation of the current time on the client.
  705. func Now() Timestamp { return Time(time.Now()) }
  706. // Time converts a Timestamp into a time.Time.
  707. func (ts Timestamp) Time() time.Time { return time.Unix(0, int64(ts)*1e3) }
  708. // TruncateToMilliseconds truncates a Timestamp to millisecond granularity,
  709. // which is currently the only granularity supported.
  710. func (ts Timestamp) TruncateToMilliseconds() Timestamp {
  711. if ts == ServerTime {
  712. return ts
  713. }
  714. return ts - ts%1000
  715. }
  716. // ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
  717. // It returns the newly written cells.
  718. func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
  719. ctx = mergeOutgoingMetadata(ctx, t.md)
  720. req := &btpb.ReadModifyWriteRowRequest{
  721. TableName: t.c.fullTableName(t.table),
  722. AppProfileId: t.c.appProfile,
  723. RowKey: []byte(row),
  724. Rules: m.ops,
  725. }
  726. res, err := t.c.client.ReadModifyWriteRow(ctx, req)
  727. if err != nil {
  728. return nil, err
  729. }
  730. if res.Row == nil {
  731. return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
  732. }
  733. r := make(Row)
  734. for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
  735. decodeFamilyProto(r, row, fam)
  736. }
  737. return r, nil
  738. }
  739. // ReadModifyWrite represents a set of operations on a single row of a table.
  740. // It is like Mutation but for non-idempotent changes.
  741. // When applied, these operations operate on the latest values of the row's cells,
  742. // and result in a new value being written to the relevant cell with a timestamp
  743. // that is max(existing timestamp, current server time).
  744. //
  745. // The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
  746. // be executed serially by the server.
  747. type ReadModifyWrite struct {
  748. ops []*btpb.ReadModifyWriteRule
  749. }
  750. // NewReadModifyWrite returns a new ReadModifyWrite.
  751. func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) }
  752. // AppendValue appends a value to a specific cell's value.
  753. // If the cell is unset, it will be treated as an empty value.
  754. func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) {
  755. m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
  756. FamilyName: family,
  757. ColumnQualifier: []byte(column),
  758. Rule: &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v},
  759. })
  760. }
  761. // Increment interprets the value in a specific cell as a 64-bit big-endian signed integer,
  762. // and adds a value to it. If the cell is unset, it will be treated as zero.
  763. // If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite
  764. // operation will fail.
  765. func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
  766. m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
  767. FamilyName: family,
  768. ColumnQualifier: []byte(column),
  769. Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta},
  770. })
  771. }
  772. // mergeOutgoingMetadata returns a context populated by the existing outgoing metadata,
  773. // if any, joined with internal metadata.
  774. func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
  775. mdCopy, _ := metadata.FromOutgoingContext(ctx)
  776. return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md))
  777. }
  778. // SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
  779. // the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
  780. func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
  781. ctx = mergeOutgoingMetadata(ctx, t.md)
  782. var sampledRowKeys []string
  783. err := gax.Invoke(ctx, func(ctx context.Context) error {
  784. sampledRowKeys = nil
  785. req := &btpb.SampleRowKeysRequest{
  786. TableName: t.c.fullTableName(t.table),
  787. AppProfileId: t.c.appProfile,
  788. }
  789. ctx, cancel := context.WithCancel(ctx) // for aborting the stream
  790. defer cancel()
  791. stream, err := t.c.client.SampleRowKeys(ctx, req)
  792. if err != nil {
  793. return err
  794. }
  795. for {
  796. res, err := stream.Recv()
  797. if err == io.EOF {
  798. break
  799. }
  800. if err != nil {
  801. return err
  802. }
  803. key := string(res.RowKey)
  804. if key == "" {
  805. continue
  806. }
  807. sampledRowKeys = append(sampledRowKeys, key)
  808. }
  809. return nil
  810. }, retryOptions...)
  811. return sampledRowKeys, err
  812. }