You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

266 line
6.8 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package testutil
  14. import (
  15. "context"
  16. "encoding/binary"
  17. "fmt"
  18. "io"
  19. "net"
  20. "sync"
  21. "testing"
  22. "time"
  23. "github.com/golang/protobuf/ptypes/empty"
  24. proto3 "github.com/golang/protobuf/ptypes/struct"
  25. pbt "github.com/golang/protobuf/ptypes/timestamp"
  26. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/status"
  30. )
  31. var (
  32. // KvMeta is the Metadata for mocked KV table.
  33. KvMeta = sppb.ResultSetMetadata{
  34. RowType: &sppb.StructType{
  35. Fields: []*sppb.StructType_Field{
  36. {
  37. Name: "Key",
  38. Type: &sppb.Type{Code: sppb.TypeCode_STRING},
  39. },
  40. {
  41. Name: "Value",
  42. Type: &sppb.Type{Code: sppb.TypeCode_STRING},
  43. },
  44. },
  45. },
  46. }
  47. )
  48. // MockCtlMsg encapsulates PartialResultSet/error that might be sent to
  49. // client
  50. type MockCtlMsg struct {
  51. // If ResumeToken == true, mock server will generate a row with
  52. // resume token.
  53. ResumeToken bool
  54. // If Err != nil, mock server will return error in RPC response.
  55. Err error
  56. }
  57. // MockCloudSpanner is a mock implementation of SpannerServer interface.
  58. // TODO: make MockCloudSpanner a full-fleged Cloud Spanner implementation.
  59. type MockCloudSpanner struct {
  60. sppb.SpannerServer
  61. s *grpc.Server
  62. t *testing.T
  63. addr string
  64. msgs chan MockCtlMsg
  65. readTs time.Time
  66. mu sync.Mutex
  67. next int
  68. nextSession int
  69. sessions map[string]*sppb.Session
  70. }
  71. // Addr returns the listening address of mock server.
  72. func (m *MockCloudSpanner) Addr() string {
  73. return m.addr
  74. }
  75. // AddMsg generates a new mocked row which can be received by client.
  76. func (m *MockCloudSpanner) AddMsg(err error, resumeToken bool) {
  77. msg := MockCtlMsg{
  78. ResumeToken: resumeToken,
  79. Err: err,
  80. }
  81. if err == io.EOF {
  82. close(m.msgs)
  83. } else {
  84. m.msgs <- msg
  85. }
  86. }
  87. // Done signals an end to a mocked stream.
  88. func (m *MockCloudSpanner) Done() {
  89. close(m.msgs)
  90. }
  91. // CreateSession is a placeholder for SpannerServer.CreateSession.
  92. func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) {
  93. m.mu.Lock()
  94. defer m.mu.Unlock()
  95. name := fmt.Sprintf("session-%d", m.nextSession)
  96. m.nextSession++
  97. s := &sppb.Session{Name: name}
  98. m.sessions[name] = s
  99. return s, nil
  100. }
  101. // GetSession is a placeholder for SpannerServer.GetSession.
  102. func (m *MockCloudSpanner) GetSession(c context.Context, r *sppb.GetSessionRequest) (*sppb.Session, error) {
  103. m.mu.Lock()
  104. defer m.mu.Unlock()
  105. if s, ok := m.sessions[r.Name]; ok {
  106. return s, nil
  107. }
  108. return nil, status.Errorf(codes.NotFound, "not found")
  109. }
  110. // DeleteSession is a placeholder for SpannerServer.DeleteSession.
  111. func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) {
  112. m.mu.Lock()
  113. defer m.mu.Unlock()
  114. delete(m.sessions, r.Name)
  115. return &empty.Empty{}, nil
  116. }
  117. // EncodeResumeToken return mock resume token encoding for an uint64 integer.
  118. func EncodeResumeToken(t uint64) []byte {
  119. rt := make([]byte, 16)
  120. binary.PutUvarint(rt, t)
  121. return rt
  122. }
  123. // DecodeResumeToken decodes a mock resume token into an uint64 integer.
  124. func DecodeResumeToken(t []byte) (uint64, error) {
  125. s, n := binary.Uvarint(t)
  126. if n <= 0 {
  127. return 0, fmt.Errorf("invalid resume token: %v", t)
  128. }
  129. return s, nil
  130. }
  131. // ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql.
  132. func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error {
  133. switch r.Sql {
  134. case "SELECT * from t_unavailable":
  135. return status.Errorf(codes.Unavailable, "mock table unavailable")
  136. case "UPDATE t SET x = 2 WHERE x = 1":
  137. err := s.Send(&sppb.PartialResultSet{
  138. Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountLowerBound{3}},
  139. })
  140. if err != nil {
  141. panic(err)
  142. }
  143. return nil
  144. case "SELECT t.key key, t.value value FROM t_mock t":
  145. if r.ResumeToken != nil {
  146. s, err := DecodeResumeToken(r.ResumeToken)
  147. if err != nil {
  148. return err
  149. }
  150. m.mu.Lock()
  151. m.next = int(s) + 1
  152. m.mu.Unlock()
  153. }
  154. for {
  155. msg, more := <-m.msgs
  156. if !more {
  157. break
  158. }
  159. if msg.Err == nil {
  160. var rt []byte
  161. if msg.ResumeToken {
  162. m.mu.Lock()
  163. rt = EncodeResumeToken(uint64(m.next))
  164. m.mu.Unlock()
  165. }
  166. meta := KvMeta
  167. meta.Transaction = &sppb.Transaction{
  168. ReadTimestamp: &pbt.Timestamp{
  169. Seconds: m.readTs.Unix(),
  170. Nanos: int32(m.readTs.Nanosecond()),
  171. },
  172. }
  173. m.mu.Lock()
  174. next := m.next
  175. m.next++
  176. m.mu.Unlock()
  177. err := s.Send(&sppb.PartialResultSet{
  178. Metadata: &meta,
  179. Values: []*proto3.Value{
  180. {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("foo-%02d", next)}},
  181. {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("bar-%02d", next)}},
  182. },
  183. ResumeToken: rt,
  184. })
  185. if err != nil {
  186. return err
  187. }
  188. continue
  189. }
  190. return msg.Err
  191. }
  192. return nil
  193. default:
  194. return fmt.Errorf("unsupported SQL: %v", r.Sql)
  195. }
  196. }
  197. // StreamingRead is a placeholder for SpannerServer.StreamingRead.
  198. func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error {
  199. return s.Send(&sppb.PartialResultSet{})
  200. }
  201. // Serve runs a MockCloudSpanner listening on a random localhost address.
  202. func (m *MockCloudSpanner) Serve() {
  203. m.s = grpc.NewServer()
  204. if m.addr == "" {
  205. m.addr = "localhost:0"
  206. }
  207. lis, err := net.Listen("tcp", m.addr)
  208. if err != nil {
  209. m.t.Fatalf("Failed to listen: %v", err)
  210. }
  211. _, port, err := net.SplitHostPort(lis.Addr().String())
  212. if err != nil {
  213. m.t.Fatalf("Failed to parse listener address: %v", err)
  214. }
  215. sppb.RegisterSpannerServer(m.s, m)
  216. m.addr = "localhost:" + port
  217. go m.s.Serve(lis)
  218. }
  219. // BeginTransaction is a placeholder for SpannerServer.BeginTransaction.
  220. func (m *MockCloudSpanner) BeginTransaction(_ context.Context, r *sppb.BeginTransactionRequest) (*sppb.Transaction, error) {
  221. m.mu.Lock()
  222. defer m.mu.Unlock()
  223. return &sppb.Transaction{}, nil
  224. }
  225. // Stop terminates MockCloudSpanner and closes the serving port.
  226. func (m *MockCloudSpanner) Stop() {
  227. m.s.Stop()
  228. }
  229. // NewMockCloudSpanner creates a new MockCloudSpanner instance.
  230. func NewMockCloudSpanner(t *testing.T, ts time.Time) *MockCloudSpanner {
  231. mcs := &MockCloudSpanner{
  232. t: t,
  233. msgs: make(chan MockCtlMsg, 1000),
  234. readTs: ts,
  235. sessions: map[string]*sppb.Session{},
  236. }
  237. return mcs
  238. }