25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 

242 satır
6.2 KiB

  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package bytestream provides a client for any service that exposes a ByteStream API.
  15. //
  16. // Note: This package is a work-in-progress. Backwards-incompatible changes should be expected.
  17. package bytestream
  18. // This file contains the client implementation of Bytestream declared at:
  19. // https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
  20. import (
  21. "fmt"
  22. "math/rand"
  23. "time"
  24. "golang.org/x/net/context"
  25. "google.golang.org/grpc"
  26. pb "google.golang.org/genproto/googleapis/bytestream"
  27. )
  28. const (
  29. // MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk.
  30. MaxBufSize = 2 * 1024 * 1024
  31. backoffBase = 10 * time.Millisecond
  32. backoffMax = 1 * time.Second
  33. maxTries = 5
  34. )
  35. // Client is the go wrapper around a ByteStreamClient and provides an interface to it.
  36. type Client struct {
  37. client pb.ByteStreamClient
  38. options []grpc.CallOption
  39. }
  40. // NewClient creates a new bytestream.Client.
  41. func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
  42. return &Client{
  43. client: pb.NewByteStreamClient(cc),
  44. options: options,
  45. }
  46. }
  47. // Reader reads from a byte stream.
  48. type Reader struct {
  49. ctx context.Context
  50. c *Client
  51. readClient pb.ByteStream_ReadClient
  52. resourceName string
  53. err error
  54. buf []byte
  55. }
  56. // ResourceName gets the resource name this Reader is reading.
  57. func (r *Reader) ResourceName() string {
  58. return r.resourceName
  59. }
  60. // Read implements io.Reader.
  61. // Read buffers received bytes that do not fit in p.
  62. func (r *Reader) Read(p []byte) (int, error) {
  63. if r.err != nil {
  64. return 0, r.err
  65. }
  66. var backoffDelay time.Duration
  67. for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
  68. // No data in buffer.
  69. resp, err := r.readClient.Recv()
  70. if err != nil {
  71. r.err = err
  72. return 0, err
  73. }
  74. r.buf = resp.Data
  75. if len(r.buf) != 0 {
  76. break
  77. }
  78. // back off
  79. if backoffDelay < backoffBase {
  80. backoffDelay = backoffBase
  81. } else {
  82. backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
  83. }
  84. if backoffDelay > backoffMax {
  85. backoffDelay = backoffMax
  86. }
  87. select {
  88. case <-time.After(backoffDelay):
  89. case <-r.ctx.Done():
  90. if err := r.ctx.Err(); err != nil {
  91. r.err = err
  92. }
  93. return 0, r.err
  94. }
  95. }
  96. // Copy from buffer.
  97. n := copy(p, r.buf)
  98. r.buf = r.buf[n:]
  99. return n, nil
  100. }
  101. // Close implements io.Closer.
  102. func (r *Reader) Close() error {
  103. if r.readClient == nil {
  104. return nil
  105. }
  106. err := r.readClient.CloseSend()
  107. r.readClient = nil
  108. return err
  109. }
  110. // NewReader creates a new Reader to read a resource.
  111. func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
  112. return c.NewReaderAt(ctx, resourceName, 0)
  113. }
  114. // NewReader creates a new Reader to read a resource from the given offset.
  115. func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
  116. // readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader.
  117. readClient, err := c.client.Read(ctx, &pb.ReadRequest{
  118. ResourceName: resourceName,
  119. ReadOffset: offset,
  120. }, c.options...)
  121. if err != nil {
  122. return nil, err
  123. }
  124. return &Reader{
  125. ctx: ctx,
  126. c: c,
  127. resourceName: resourceName,
  128. readClient: readClient,
  129. }, nil
  130. }
  131. // Writer writes to a byte stream.
  132. type Writer struct {
  133. ctx context.Context
  134. writeClient pb.ByteStream_WriteClient
  135. resourceName string
  136. offset int64
  137. backoffDelay time.Duration
  138. err error
  139. }
  140. // ResourceName gets the resource name this Writer is writing.
  141. func (w *Writer) ResourceName() string {
  142. return w.resourceName
  143. }
  144. // Write implements io.Writer.
  145. func (w *Writer) Write(p []byte) (int, error) {
  146. if w.err != nil {
  147. return 0, w.err
  148. }
  149. n := 0
  150. for n < len(p) {
  151. bufSize := len(p) - n
  152. if bufSize > MaxBufSize {
  153. bufSize = MaxBufSize
  154. }
  155. r := pb.WriteRequest{
  156. WriteOffset: w.offset,
  157. FinishWrite: false,
  158. Data: p[n : n+bufSize],
  159. }
  160. // Bytestream only requires the resourceName to be sent in the first WriteRequest.
  161. if w.offset == 0 {
  162. r.ResourceName = w.resourceName
  163. }
  164. err := w.writeClient.Send(&r)
  165. if err != nil {
  166. w.err = err
  167. return n, err
  168. }
  169. w.offset += int64(bufSize)
  170. n += bufSize
  171. }
  172. return n, nil
  173. }
  174. // Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done.
  175. func (w *Writer) Close() error {
  176. err := w.writeClient.Send(&pb.WriteRequest{
  177. ResourceName: w.resourceName,
  178. WriteOffset: w.offset,
  179. FinishWrite: true,
  180. Data: nil,
  181. })
  182. if err != nil {
  183. w.err = err
  184. return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
  185. }
  186. resp, err := w.writeClient.CloseAndRecv()
  187. if err != nil {
  188. w.err = err
  189. return fmt.Errorf("CloseAndRecv: %v", err)
  190. }
  191. if resp == nil {
  192. err = fmt.Errorf("expected a response on close, got %v", resp)
  193. } else if resp.CommittedSize != w.offset {
  194. err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
  195. }
  196. w.err = err
  197. return err
  198. }
  199. // NewWriter creates a new Writer to write a resource.
  200. //
  201. // resourceName specifies the name of the resource.
  202. // The resource will be available after Close has been called.
  203. //
  204. // It is the caller's responsibility to call Close when writing is done.
  205. //
  206. // TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus.
  207. func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
  208. wc, err := c.client.Write(ctx, c.options...)
  209. if err != nil {
  210. return nil, err
  211. }
  212. return &Writer{
  213. ctx: ctx,
  214. writeClient: wc,
  215. resourceName: resourceName,
  216. }, nil
  217. }