You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

309 lines
9.4 KiB

  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package testutil
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/golang/protobuf/proto"
  22. "github.com/golang/protobuf/ptypes/empty"
  23. proto3 "github.com/golang/protobuf/ptypes/struct"
  24. pbt "github.com/golang/protobuf/ptypes/timestamp"
  25. pbs "google.golang.org/genproto/googleapis/rpc/status"
  26. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/status"
  30. )
  31. // MockCloudSpannerClient is a mock implementation of sppb.SpannerClient.
  32. type MockCloudSpannerClient struct {
  33. sppb.SpannerClient
  34. mu sync.Mutex
  35. t *testing.T
  36. // Live sessions on the client.
  37. sessions map[string]bool
  38. // Session ping history.
  39. pings []string
  40. // Client will stall on any requests.
  41. freezed chan struct{}
  42. // Expected set of actions that have been executed by the client. These
  43. // interfaces should be type reflected against with *Request types in sppb,
  44. // such as sppb.GetSessionRequest. Buffered to a large degree.
  45. ReceivedRequests chan interface{}
  46. }
  47. // NewMockCloudSpannerClient creates new MockCloudSpannerClient instance.
  48. func NewMockCloudSpannerClient(t *testing.T) *MockCloudSpannerClient {
  49. mc := &MockCloudSpannerClient{
  50. t: t,
  51. sessions: map[string]bool{},
  52. ReceivedRequests: make(chan interface{}, 100000),
  53. }
  54. // Produce a closed channel, so the default action of ready is to not block.
  55. mc.Freeze()
  56. mc.Unfreeze()
  57. return mc
  58. }
  59. // DumpPings dumps the ping history.
  60. func (m *MockCloudSpannerClient) DumpPings() []string {
  61. m.mu.Lock()
  62. defer m.mu.Unlock()
  63. return append([]string(nil), m.pings...)
  64. }
  65. // DumpSessions dumps the internal session table.
  66. func (m *MockCloudSpannerClient) DumpSessions() map[string]bool {
  67. m.mu.Lock()
  68. defer m.mu.Unlock()
  69. st := map[string]bool{}
  70. for s, v := range m.sessions {
  71. st[s] = v
  72. }
  73. return st
  74. }
  75. // CreateSession is a placeholder for SpannerClient.CreateSession.
  76. func (m *MockCloudSpannerClient) CreateSession(ctx context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  77. m.ready()
  78. m.ReceivedRequests <- r
  79. m.mu.Lock()
  80. defer m.mu.Unlock()
  81. s := &sppb.Session{}
  82. if r.Database != "mockdb" {
  83. // Reject other databases
  84. return s, status.Errorf(codes.NotFound, fmt.Sprintf("database not found: %v", r.Database))
  85. }
  86. // Generate & record session name.
  87. s.Name = fmt.Sprintf("mockdb-%v", time.Now().UnixNano())
  88. m.sessions[s.Name] = true
  89. return s, nil
  90. }
  91. // GetSession is a placeholder for SpannerClient.GetSession.
  92. func (m *MockCloudSpannerClient) GetSession(ctx context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  93. m.ready()
  94. m.ReceivedRequests <- r
  95. m.mu.Lock()
  96. defer m.mu.Unlock()
  97. m.pings = append(m.pings, r.Name)
  98. if _, ok := m.sessions[r.Name]; !ok {
  99. return nil, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
  100. }
  101. return &sppb.Session{Name: r.Name}, nil
  102. }
  103. // DeleteSession is a placeholder for SpannerClient.DeleteSession.
  104. func (m *MockCloudSpannerClient) DeleteSession(ctx context.Context, r *sppb.DeleteSessionRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
  105. m.ready()
  106. m.ReceivedRequests <- r
  107. m.mu.Lock()
  108. defer m.mu.Unlock()
  109. if _, ok := m.sessions[r.Name]; !ok {
  110. // Session not found.
  111. return &empty.Empty{}, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
  112. }
  113. // Delete session from in-memory table.
  114. delete(m.sessions, r.Name)
  115. return &empty.Empty{}, nil
  116. }
  117. // ExecuteSql is a placeholder for SpannerClient.ExecuteSql.
  118. func (m *MockCloudSpannerClient) ExecuteSql(ctx context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (*sppb.ResultSet, error) {
  119. m.ready()
  120. m.ReceivedRequests <- r
  121. m.mu.Lock()
  122. defer m.mu.Unlock()
  123. return &sppb.ResultSet{Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountExact{7}}}, nil
  124. }
  125. // ExecuteBatchDml is a placeholder for SpannerClient.ExecuteBatchDml.
  126. func (m *MockCloudSpannerClient) ExecuteBatchDml(ctx context.Context, r *sppb.ExecuteBatchDmlRequest, opts ...grpc.CallOption) (*sppb.ExecuteBatchDmlResponse, error) {
  127. m.ready()
  128. m.ReceivedRequests <- r
  129. m.mu.Lock()
  130. defer m.mu.Unlock()
  131. return &sppb.ExecuteBatchDmlResponse{Status: &pbs.Status{Code: 0}, ResultSets: []*sppb.ResultSet{}}, nil
  132. }
  133. // ExecuteStreamingSql is a mock implementation of SpannerClient.ExecuteStreamingSql.
  134. func (m *MockCloudSpannerClient) ExecuteStreamingSql(ctx context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (sppb.Spanner_ExecuteStreamingSqlClient, error) {
  135. m.ready()
  136. m.ReceivedRequests <- r
  137. m.mu.Lock()
  138. defer m.mu.Unlock()
  139. wantReq := &sppb.ExecuteSqlRequest{
  140. Session: "mocksession",
  141. Transaction: &sppb.TransactionSelector{
  142. Selector: &sppb.TransactionSelector_SingleUse{
  143. SingleUse: &sppb.TransactionOptions{
  144. Mode: &sppb.TransactionOptions_ReadOnly_{
  145. ReadOnly: &sppb.TransactionOptions_ReadOnly{
  146. TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{
  147. Strong: true,
  148. },
  149. ReturnReadTimestamp: false,
  150. },
  151. },
  152. },
  153. },
  154. },
  155. Sql: "mockquery",
  156. Params: &proto3.Struct{
  157. Fields: map[string]*proto3.Value{"var1": {Kind: &proto3.Value_StringValue{StringValue: "abc"}}},
  158. },
  159. ParamTypes: map[string]*sppb.Type{"var1": {Code: sppb.TypeCode_STRING}},
  160. }
  161. if !proto.Equal(r, wantReq) {
  162. return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq)
  163. }
  164. return nil, errors.New("query never succeeds on mock client")
  165. }
  166. // StreamingRead is a placeholder for SpannerClient.StreamingRead.
  167. func (m *MockCloudSpannerClient) StreamingRead(ctx context.Context, r *sppb.ReadRequest, opts ...grpc.CallOption) (sppb.Spanner_StreamingReadClient, error) {
  168. m.ready()
  169. m.ReceivedRequests <- r
  170. m.mu.Lock()
  171. defer m.mu.Unlock()
  172. wantReq := &sppb.ReadRequest{
  173. Session: "mocksession",
  174. Transaction: &sppb.TransactionSelector{
  175. Selector: &sppb.TransactionSelector_SingleUse{
  176. SingleUse: &sppb.TransactionOptions{
  177. Mode: &sppb.TransactionOptions_ReadOnly_{
  178. ReadOnly: &sppb.TransactionOptions_ReadOnly{
  179. TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{
  180. Strong: true,
  181. },
  182. ReturnReadTimestamp: false,
  183. },
  184. },
  185. },
  186. },
  187. },
  188. Table: "t_mock",
  189. Columns: []string{"col1", "col2"},
  190. KeySet: &sppb.KeySet{
  191. Keys: []*proto3.ListValue{
  192. {
  193. Values: []*proto3.Value{
  194. {Kind: &proto3.Value_StringValue{StringValue: "foo"}},
  195. },
  196. },
  197. },
  198. Ranges: []*sppb.KeyRange{},
  199. All: false,
  200. },
  201. }
  202. if !proto.Equal(r, wantReq) {
  203. return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq)
  204. }
  205. return nil, errors.New("read never succeeds on mock client")
  206. }
  207. // BeginTransaction is a placeholder for SpannerClient.BeginTransaction.
  208. func (m *MockCloudSpannerClient) BeginTransaction(ctx context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) {
  209. m.ready()
  210. m.ReceivedRequests <- r
  211. m.mu.Lock()
  212. defer m.mu.Unlock()
  213. resp := &sppb.Transaction{Id: []byte("transaction-1")}
  214. if _, ok := r.Options.Mode.(*sppb.TransactionOptions_ReadOnly_); ok {
  215. resp.ReadTimestamp = &pbt.Timestamp{Seconds: 3, Nanos: 4}
  216. }
  217. return resp, nil
  218. }
  219. // Commit is a placeholder for SpannerClient.Commit.
  220. func (m *MockCloudSpannerClient) Commit(ctx context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) {
  221. m.ready()
  222. m.ReceivedRequests <- r
  223. m.mu.Lock()
  224. defer m.mu.Unlock()
  225. return &sppb.CommitResponse{CommitTimestamp: &pbt.Timestamp{Seconds: 1, Nanos: 2}}, nil
  226. }
  227. // Rollback is a placeholder for SpannerClient.Rollback.
  228. func (m *MockCloudSpannerClient) Rollback(ctx context.Context, r *sppb.RollbackRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
  229. m.ready()
  230. m.ReceivedRequests <- r
  231. m.mu.Lock()
  232. defer m.mu.Unlock()
  233. return nil, nil
  234. }
  235. // PartitionQuery is a placeholder for SpannerServer.PartitionQuery.
  236. func (m *MockCloudSpannerClient) PartitionQuery(ctx context.Context, r *sppb.PartitionQueryRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) {
  237. m.ready()
  238. m.ReceivedRequests <- r
  239. return nil, errors.New("Unimplemented")
  240. }
  241. // PartitionRead is a placeholder for SpannerServer.PartitionRead.
  242. func (m *MockCloudSpannerClient) PartitionRead(ctx context.Context, r *sppb.PartitionReadRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) {
  243. m.ready()
  244. m.ReceivedRequests <- r
  245. return nil, errors.New("Unimplemented")
  246. }
  247. // Freeze stalls all requests.
  248. func (m *MockCloudSpannerClient) Freeze() {
  249. m.mu.Lock()
  250. defer m.mu.Unlock()
  251. m.freezed = make(chan struct{})
  252. }
  253. // Unfreeze restores processing requests.
  254. func (m *MockCloudSpannerClient) Unfreeze() {
  255. m.mu.Lock()
  256. defer m.mu.Unlock()
  257. close(m.freezed)
  258. }
  259. // ready checks conditions before executing requests
  260. // TODO: add checks for injected errors, actions
  261. func (m *MockCloudSpannerClient) ready() {
  262. m.mu.Lock()
  263. freezed := m.freezed
  264. m.mu.Unlock()
  265. // check if client should be freezed
  266. <-freezed
  267. }