You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

122 lines
2.6 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rpcreplay
  15. import (
  16. "context"
  17. "io"
  18. "log"
  19. "net"
  20. pb "cloud.google.com/go/rpcreplay/proto/intstore"
  21. "google.golang.org/grpc"
  22. "google.golang.org/grpc/codes"
  23. "google.golang.org/grpc/status"
  24. )
  25. // intStoreServer is an in-memory implementation of IntStore.
  26. type intStoreServer struct {
  27. pb.IntStoreServer
  28. Addr string
  29. l net.Listener
  30. gsrv *grpc.Server
  31. items map[string]int32
  32. }
  33. func newIntStoreServer() *intStoreServer {
  34. l, err := net.Listen("tcp", "127.0.0.1:0")
  35. if err != nil {
  36. log.Fatal(err)
  37. }
  38. s := &intStoreServer{
  39. Addr: l.Addr().String(),
  40. l: l,
  41. gsrv: grpc.NewServer(),
  42. }
  43. pb.RegisterIntStoreServer(s.gsrv, s)
  44. go s.gsrv.Serve(s.l)
  45. return s
  46. }
  47. func (s *intStoreServer) stop() {
  48. s.gsrv.Stop()
  49. s.l.Close()
  50. }
  51. func (s *intStoreServer) Set(_ context.Context, item *pb.Item) (*pb.SetResponse, error) {
  52. old := s.setItem(item)
  53. return &pb.SetResponse{PrevValue: old}, nil
  54. }
  55. func (s *intStoreServer) setItem(item *pb.Item) int32 {
  56. if s.items == nil {
  57. s.items = map[string]int32{}
  58. }
  59. old := s.items[item.Name]
  60. s.items[item.Name] = item.Value
  61. return old
  62. }
  63. func (s *intStoreServer) Get(_ context.Context, req *pb.GetRequest) (*pb.Item, error) {
  64. val, ok := s.items[req.Name]
  65. if !ok {
  66. return nil, status.Errorf(codes.NotFound, "%q", req.Name)
  67. }
  68. return &pb.Item{Name: req.Name, Value: val}, nil
  69. }
  70. func (s *intStoreServer) ListItems(_ *pb.ListItemsRequest, ss pb.IntStore_ListItemsServer) error {
  71. for name, val := range s.items {
  72. if err := ss.Send(&pb.Item{Name: name, Value: val}); err != nil {
  73. return err
  74. }
  75. }
  76. return nil
  77. }
  78. func (s *intStoreServer) SetStream(ss pb.IntStore_SetStreamServer) error {
  79. n := 0
  80. for {
  81. item, err := ss.Recv()
  82. if err == io.EOF {
  83. break
  84. }
  85. if err != nil {
  86. return err
  87. }
  88. s.setItem(item)
  89. n++
  90. }
  91. return ss.SendAndClose(&pb.Summary{Count: int32(n)})
  92. }
  93. func (s *intStoreServer) StreamChat(ss pb.IntStore_StreamChatServer) error {
  94. for {
  95. item, err := ss.Recv()
  96. if err == io.EOF {
  97. break
  98. }
  99. if err != nil {
  100. return err
  101. }
  102. if err := ss.Send(item); err != nil {
  103. return err
  104. }
  105. }
  106. return nil
  107. }