// Copyright 2016 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package bytestream provides a client for any service that exposes a ByteStream API. // // Note: This package is a work-in-progress. Backwards-incompatible changes should be expected. package bytestream // This file contains the client implementation of Bytestream declared at: // https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto import ( "context" "fmt" "math/rand" "time" "google.golang.org/grpc" pb "google.golang.org/genproto/googleapis/bytestream" ) const ( // MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk. MaxBufSize = 2 * 1024 * 1024 backoffBase = 10 * time.Millisecond backoffMax = 1 * time.Second maxTries = 5 ) // Client is the go wrapper around a ByteStreamClient and provides an interface to it. type Client struct { client pb.ByteStreamClient options []grpc.CallOption } // NewClient creates a new bytestream.Client. func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client { return &Client{ client: pb.NewByteStreamClient(cc), options: options, } } // Reader reads from a byte stream. type Reader struct { ctx context.Context c *Client readClient pb.ByteStream_ReadClient resourceName string err error buf []byte } // ResourceName gets the resource name this Reader is reading. func (r *Reader) ResourceName() string { return r.resourceName } // Read implements io.Reader. // Read buffers received bytes that do not fit in p. func (r *Reader) Read(p []byte) (int, error) { if r.err != nil { return 0, r.err } var backoffDelay time.Duration for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ { // No data in buffer. resp, err := r.readClient.Recv() if err != nil { r.err = err return 0, err } r.buf = resp.Data if len(r.buf) != 0 { break } // back off if backoffDelay < backoffBase { backoffDelay = backoffBase } else { backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64())) } if backoffDelay > backoffMax { backoffDelay = backoffMax } select { case <-time.After(backoffDelay): case <-r.ctx.Done(): if err := r.ctx.Err(); err != nil { r.err = err } return 0, r.err } } // Copy from buffer. n := copy(p, r.buf) r.buf = r.buf[n:] return n, nil } // Close implements io.Closer. func (r *Reader) Close() error { if r.readClient == nil { return nil } err := r.readClient.CloseSend() r.readClient = nil return err } // NewReader creates a new Reader to read a resource. func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) { return c.NewReaderAt(ctx, resourceName, 0) } // NewReaderAt creates a new Reader to read a resource from the given offset. func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) { // readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader. readClient, err := c.client.Read(ctx, &pb.ReadRequest{ ResourceName: resourceName, ReadOffset: offset, }, c.options...) if err != nil { return nil, err } return &Reader{ ctx: ctx, c: c, resourceName: resourceName, readClient: readClient, }, nil } // Writer writes to a byte stream. type Writer struct { ctx context.Context writeClient pb.ByteStream_WriteClient resourceName string offset int64 err error } // ResourceName gets the resource name this Writer is writing. func (w *Writer) ResourceName() string { return w.resourceName } // Write implements io.Writer. func (w *Writer) Write(p []byte) (int, error) { if w.err != nil { return 0, w.err } n := 0 for n < len(p) { bufSize := len(p) - n if bufSize > MaxBufSize { bufSize = MaxBufSize } r := pb.WriteRequest{ WriteOffset: w.offset, FinishWrite: false, Data: p[n : n+bufSize], } // Bytestream only requires the resourceName to be sent in the first WriteRequest. if w.offset == 0 { r.ResourceName = w.resourceName } err := w.writeClient.Send(&r) if err != nil { w.err = err return n, err } w.offset += int64(bufSize) n += bufSize } return n, nil } // Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done. func (w *Writer) Close() error { err := w.writeClient.Send(&pb.WriteRequest{ ResourceName: w.resourceName, WriteOffset: w.offset, FinishWrite: true, Data: nil, }) if err != nil { w.err = err return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err) } resp, err := w.writeClient.CloseAndRecv() if err != nil { w.err = err return fmt.Errorf("CloseAndRecv: %v", err) } if resp == nil { err = fmt.Errorf("expected a response on close, got %v", resp) } else if resp.CommittedSize != w.offset { err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset) } w.err = err return err } // NewWriter creates a new Writer to write a resource. // // resourceName specifies the name of the resource. // The resource will be available after Close has been called. // // It is the caller's responsibility to call Close when writing is done. // // TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus. func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) { wc, err := c.client.Write(ctx, c.options...) if err != nil { return nil, err } return &Writer{ ctx: ctx, writeClient: wc, resourceName: resourceName, }, nil }