You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

569 lines
13 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rpcreplay
  15. import (
  16. "bytes"
  17. "context"
  18. "errors"
  19. "io"
  20. "strings"
  21. "testing"
  22. "cloud.google.com/go/internal/testutil"
  23. ipb "cloud.google.com/go/rpcreplay/proto/intstore"
  24. rpb "cloud.google.com/go/rpcreplay/proto/rpcreplay"
  25. "github.com/golang/protobuf/proto"
  26. "github.com/google/go-cmp/cmp"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/status"
  30. )
  31. func TestRecordIO(t *testing.T) {
  32. buf := &bytes.Buffer{}
  33. want := []byte{1, 2, 3}
  34. if err := writeRecord(buf, want); err != nil {
  35. t.Fatal(err)
  36. }
  37. got, err := readRecord(buf)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. if !bytes.Equal(got, want) {
  42. t.Errorf("got %v, want %v", got, want)
  43. }
  44. }
  45. func TestHeaderIO(t *testing.T) {
  46. buf := &bytes.Buffer{}
  47. want := []byte{1, 2, 3}
  48. if err := writeHeader(buf, want); err != nil {
  49. t.Fatal(err)
  50. }
  51. got, err := readHeader(buf)
  52. if err != nil {
  53. t.Fatal(err)
  54. }
  55. if !testutil.Equal(got, want) {
  56. t.Errorf("got %v, want %v", got, want)
  57. }
  58. // readHeader errors
  59. for _, contents := range []string{"", "badmagic", "gRPCReplay"} {
  60. if _, err := readHeader(bytes.NewBufferString(contents)); err == nil {
  61. t.Errorf("%q: got nil, want error", contents)
  62. }
  63. }
  64. }
  65. func TestEntryIO(t *testing.T) {
  66. for i, want := range []*entry{
  67. {
  68. kind: rpb.Entry_REQUEST,
  69. method: "method",
  70. msg: message{msg: &rpb.Entry{}},
  71. refIndex: 7,
  72. },
  73. {
  74. kind: rpb.Entry_RESPONSE,
  75. method: "method",
  76. msg: message{err: status.Error(codes.NotFound, "not found")},
  77. refIndex: 8,
  78. },
  79. {
  80. kind: rpb.Entry_RECV,
  81. method: "method",
  82. msg: message{err: io.EOF},
  83. refIndex: 3,
  84. },
  85. } {
  86. buf := &bytes.Buffer{}
  87. if err := writeEntry(buf, want); err != nil {
  88. t.Fatal(err)
  89. }
  90. got, err := readEntry(buf)
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. if !got.equal(want) {
  95. t.Errorf("#%d: got %v, want %v", i, got, want)
  96. }
  97. }
  98. }
  99. var initialState = []byte{1, 2, 3}
  100. func TestRecord(t *testing.T) {
  101. srv := newIntStoreServer()
  102. defer srv.stop()
  103. buf := record(t, srv)
  104. gotIstate, err := readHeader(buf)
  105. if err != nil {
  106. t.Fatal(err)
  107. }
  108. if !testutil.Equal(gotIstate, initialState) {
  109. t.Fatalf("got %v, want %v", gotIstate, initialState)
  110. }
  111. item := &ipb.Item{Name: "a", Value: 1}
  112. wantEntries := []*entry{
  113. // Set
  114. {
  115. kind: rpb.Entry_REQUEST,
  116. method: "/intstore.IntStore/Set",
  117. msg: message{msg: item},
  118. },
  119. {
  120. kind: rpb.Entry_RESPONSE,
  121. msg: message{msg: &ipb.SetResponse{PrevValue: 0}},
  122. refIndex: 1,
  123. },
  124. // Get
  125. {
  126. kind: rpb.Entry_REQUEST,
  127. method: "/intstore.IntStore/Get",
  128. msg: message{msg: &ipb.GetRequest{Name: "a"}},
  129. },
  130. {
  131. kind: rpb.Entry_RESPONSE,
  132. msg: message{msg: item},
  133. refIndex: 3,
  134. },
  135. {
  136. kind: rpb.Entry_REQUEST,
  137. method: "/intstore.IntStore/Get",
  138. msg: message{msg: &ipb.GetRequest{Name: "x"}},
  139. },
  140. {
  141. kind: rpb.Entry_RESPONSE,
  142. msg: message{err: status.Error(codes.NotFound, `"x"`)},
  143. refIndex: 5,
  144. },
  145. // ListItems
  146. { // entry #7
  147. kind: rpb.Entry_CREATE_STREAM,
  148. method: "/intstore.IntStore/ListItems",
  149. },
  150. {
  151. kind: rpb.Entry_SEND,
  152. msg: message{msg: &ipb.ListItemsRequest{}},
  153. refIndex: 7,
  154. },
  155. {
  156. kind: rpb.Entry_RECV,
  157. msg: message{msg: item},
  158. refIndex: 7,
  159. },
  160. {
  161. kind: rpb.Entry_RECV,
  162. msg: message{err: io.EOF},
  163. refIndex: 7,
  164. },
  165. // SetStream
  166. { // entry #11
  167. kind: rpb.Entry_CREATE_STREAM,
  168. method: "/intstore.IntStore/SetStream",
  169. },
  170. {
  171. kind: rpb.Entry_SEND,
  172. msg: message{msg: &ipb.Item{Name: "b", Value: 2}},
  173. refIndex: 11,
  174. },
  175. {
  176. kind: rpb.Entry_SEND,
  177. msg: message{msg: &ipb.Item{Name: "c", Value: 3}},
  178. refIndex: 11,
  179. },
  180. {
  181. kind: rpb.Entry_RECV,
  182. msg: message{msg: &ipb.Summary{Count: 2}},
  183. refIndex: 11,
  184. },
  185. // StreamChat
  186. { // entry #15
  187. kind: rpb.Entry_CREATE_STREAM,
  188. method: "/intstore.IntStore/StreamChat",
  189. },
  190. {
  191. kind: rpb.Entry_SEND,
  192. msg: message{msg: &ipb.Item{Name: "d", Value: 4}},
  193. refIndex: 15,
  194. },
  195. {
  196. kind: rpb.Entry_RECV,
  197. msg: message{msg: &ipb.Item{Name: "d", Value: 4}},
  198. refIndex: 15,
  199. },
  200. {
  201. kind: rpb.Entry_SEND,
  202. msg: message{msg: &ipb.Item{Name: "e", Value: 5}},
  203. refIndex: 15,
  204. },
  205. {
  206. kind: rpb.Entry_RECV,
  207. msg: message{msg: &ipb.Item{Name: "e", Value: 5}},
  208. refIndex: 15,
  209. },
  210. {
  211. kind: rpb.Entry_RECV,
  212. msg: message{err: io.EOF},
  213. refIndex: 15,
  214. },
  215. }
  216. for i, w := range wantEntries {
  217. g, err := readEntry(buf)
  218. if err != nil {
  219. t.Fatalf("#%d: %v", i+1, err)
  220. }
  221. if !g.equal(w) {
  222. t.Errorf("#%d:\ngot %+v\nwant %+v", i+1, g, w)
  223. }
  224. }
  225. g, err := readEntry(buf)
  226. if err != nil {
  227. t.Fatal(err)
  228. }
  229. if g != nil {
  230. t.Errorf("\ngot %+v\nwant nil", g)
  231. }
  232. }
  233. func TestReplay(t *testing.T) {
  234. srv := newIntStoreServer()
  235. defer srv.stop()
  236. buf := record(t, srv)
  237. rep, err := NewReplayerReader(buf)
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. if got, want := rep.Initial(), initialState; !testutil.Equal(got, want) {
  242. t.Fatalf("got %v, want %v", got, want)
  243. }
  244. // Replay the test.
  245. conn, err := rep.Connection()
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. testService(t, conn)
  250. }
  251. func record(t *testing.T, srv *intStoreServer) *bytes.Buffer {
  252. buf := &bytes.Buffer{}
  253. rec, err := NewRecorderWriter(buf, initialState)
  254. if err != nil {
  255. t.Fatal(err)
  256. }
  257. conn, err := grpc.Dial(srv.Addr,
  258. append([]grpc.DialOption{grpc.WithInsecure()}, rec.DialOptions()...)...)
  259. if err != nil {
  260. t.Fatal(err)
  261. }
  262. defer conn.Close()
  263. testService(t, conn)
  264. if err := rec.Close(); err != nil {
  265. t.Fatal(err)
  266. }
  267. return buf
  268. }
  269. func testService(t *testing.T, conn *grpc.ClientConn) {
  270. client := ipb.NewIntStoreClient(conn)
  271. ctx := context.Background()
  272. item := &ipb.Item{Name: "a", Value: 1}
  273. res, err := client.Set(ctx, item)
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. if res.PrevValue != 0 {
  278. t.Errorf("got %d, want 0", res.PrevValue)
  279. }
  280. got, err := client.Get(ctx, &ipb.GetRequest{Name: "a"})
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. if !proto.Equal(got, item) {
  285. t.Errorf("got %v, want %v", got, item)
  286. }
  287. _, err = client.Get(ctx, &ipb.GetRequest{Name: "x"})
  288. if err == nil {
  289. t.Fatal("got nil, want error")
  290. }
  291. if _, ok := status.FromError(err); !ok {
  292. t.Errorf("got error type %T, want a grpc/status.Status", err)
  293. }
  294. wantItems := []*ipb.Item{item}
  295. lic, err := client.ListItems(ctx, &ipb.ListItemsRequest{})
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. for i := 0; ; i++ {
  300. item, err := lic.Recv()
  301. if err == io.EOF {
  302. break
  303. }
  304. if err != nil {
  305. t.Fatal(err)
  306. }
  307. if i >= len(wantItems) || !proto.Equal(item, wantItems[i]) {
  308. t.Fatalf("%d: bad item", i)
  309. }
  310. }
  311. ssc, err := client.SetStream(ctx)
  312. if err != nil {
  313. t.Fatal(err)
  314. }
  315. must := func(err error) {
  316. if err != nil {
  317. t.Fatal(err)
  318. }
  319. }
  320. for i, name := range []string{"b", "c"} {
  321. must(ssc.Send(&ipb.Item{Name: name, Value: int32(i + 2)}))
  322. }
  323. summary, err := ssc.CloseAndRecv()
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. if got, want := summary.Count, int32(2); got != want {
  328. t.Fatalf("got %d, want %d", got, want)
  329. }
  330. chatc, err := client.StreamChat(ctx)
  331. if err != nil {
  332. t.Fatal(err)
  333. }
  334. for i, name := range []string{"d", "e"} {
  335. item := &ipb.Item{Name: name, Value: int32(i + 4)}
  336. must(chatc.Send(item))
  337. got, err := chatc.Recv()
  338. if err != nil {
  339. t.Fatal(err)
  340. }
  341. if !proto.Equal(got, item) {
  342. t.Errorf("got %v, want %v", got, item)
  343. }
  344. }
  345. must(chatc.CloseSend())
  346. if _, err := chatc.Recv(); err != io.EOF {
  347. t.Fatalf("got %v, want EOF", err)
  348. }
  349. }
  350. func TestRecorderBeforeFunc(t *testing.T) {
  351. var tests = []struct {
  352. name string
  353. msg, wantRespMsg, wantEntryMsg *ipb.Item
  354. f func(string, proto.Message) error
  355. wantErr bool
  356. }{
  357. {
  358. name: "BeforeFunc should modify messages saved, but not alter what is sent/received to/from services",
  359. msg: &ipb.Item{Name: "foo", Value: 1},
  360. wantEntryMsg: &ipb.Item{Name: "bar", Value: 2},
  361. wantRespMsg: &ipb.Item{Name: "foo", Value: 1},
  362. f: func(method string, m proto.Message) error {
  363. // This callback only runs when Set is called.
  364. if !strings.HasSuffix(method, "Set") {
  365. return nil
  366. }
  367. if _, ok := m.(*ipb.Item); !ok {
  368. return nil
  369. }
  370. item := m.(*ipb.Item)
  371. item.Name = "bar"
  372. item.Value = 2
  373. return nil
  374. },
  375. },
  376. {
  377. name: "BeforeFunc should not be able to alter returned responses",
  378. msg: &ipb.Item{Name: "foo", Value: 1},
  379. wantRespMsg: &ipb.Item{Name: "foo", Value: 1},
  380. f: func(method string, m proto.Message) error {
  381. // This callback only runs when Get is called.
  382. if !strings.HasSuffix(method, "Get") {
  383. return nil
  384. }
  385. if _, ok := m.(*ipb.Item); !ok {
  386. return nil
  387. }
  388. item := m.(*ipb.Item)
  389. item.Value = 2
  390. return nil
  391. },
  392. },
  393. {
  394. name: "Errors should cause the RPC send to fail",
  395. msg: &ipb.Item{},
  396. f: func(_ string, _ proto.Message) error {
  397. return errors.New("err")
  398. },
  399. wantErr: true,
  400. },
  401. }
  402. for _, tc := range tests {
  403. // Wrap test cases in a func so defers execute correctly.
  404. func() {
  405. srv := newIntStoreServer()
  406. defer srv.stop()
  407. var b bytes.Buffer
  408. r, err := NewRecorderWriter(&b, nil)
  409. if err != nil {
  410. t.Error(err)
  411. return
  412. }
  413. r.BeforeFunc = tc.f
  414. ctx := context.Background()
  415. conn, err := grpc.DialContext(ctx, srv.Addr, append([]grpc.DialOption{grpc.WithInsecure()}, r.DialOptions()...)...)
  416. if err != nil {
  417. t.Error(err)
  418. return
  419. }
  420. defer conn.Close()
  421. client := ipb.NewIntStoreClient(conn)
  422. _, err = client.Set(ctx, tc.msg)
  423. switch {
  424. case err != nil && !tc.wantErr:
  425. t.Error(err)
  426. return
  427. case err == nil && tc.wantErr:
  428. t.Errorf("got nil; want error")
  429. return
  430. case err != nil:
  431. // Error found as expected, don't check Get().
  432. return
  433. }
  434. if tc.wantRespMsg != nil {
  435. got, err := client.Get(ctx, &ipb.GetRequest{Name: tc.msg.GetName()})
  436. if err != nil {
  437. t.Error(err)
  438. return
  439. }
  440. if !cmp.Equal(got, tc.wantRespMsg) {
  441. t.Errorf("got %+v; want %+v", got, tc.wantRespMsg)
  442. }
  443. }
  444. r.Close()
  445. if tc.wantEntryMsg != nil {
  446. _, _ = readHeader(&b)
  447. e, err := readEntry(&b)
  448. if err != nil {
  449. t.Error(err)
  450. return
  451. }
  452. got := e.msg.msg.(*ipb.Item)
  453. if !cmp.Equal(got, tc.wantEntryMsg) {
  454. t.Errorf("got %v; want %v", got, tc.wantEntryMsg)
  455. }
  456. }
  457. }()
  458. }
  459. }
  460. func TestReplayerBeforeFunc(t *testing.T) {
  461. var tests = []struct {
  462. name string
  463. msg, reqMsg *ipb.Item
  464. f func(string, proto.Message) error
  465. wantErr bool
  466. }{
  467. {
  468. name: "BeforeFunc should modify messages sent before they are passed to the replayer",
  469. msg: &ipb.Item{Name: "foo", Value: 1},
  470. reqMsg: &ipb.Item{Name: "bar", Value: 1},
  471. f: func(method string, m proto.Message) error {
  472. item := m.(*ipb.Item)
  473. item.Name = "foo"
  474. return nil
  475. },
  476. },
  477. {
  478. name: "Errors should cause the RPC send to fail",
  479. msg: &ipb.Item{},
  480. f: func(_ string, _ proto.Message) error {
  481. return errors.New("err")
  482. },
  483. wantErr: true,
  484. },
  485. }
  486. for _, tc := range tests {
  487. // Wrap test cases in a func so defers execute correctly.
  488. func() {
  489. srv := newIntStoreServer()
  490. defer srv.stop()
  491. var b bytes.Buffer
  492. rec, err := NewRecorderWriter(&b, nil)
  493. if err != nil {
  494. t.Error(err)
  495. return
  496. }
  497. ctx := context.Background()
  498. conn, err := grpc.DialContext(ctx, srv.Addr, append([]grpc.DialOption{grpc.WithInsecure()}, rec.DialOptions()...)...)
  499. if err != nil {
  500. t.Error(err)
  501. return
  502. }
  503. defer conn.Close()
  504. client := ipb.NewIntStoreClient(conn)
  505. _, err = client.Set(ctx, tc.msg)
  506. if err != nil {
  507. t.Error(err)
  508. return
  509. }
  510. rec.Close()
  511. rep, err := NewReplayerReader(&b)
  512. if err != nil {
  513. t.Error(err)
  514. return
  515. }
  516. rep.BeforeFunc = tc.f
  517. conn, err = grpc.DialContext(ctx, srv.Addr, append([]grpc.DialOption{grpc.WithInsecure()}, rep.DialOptions()...)...)
  518. if err != nil {
  519. t.Error(err)
  520. return
  521. }
  522. defer conn.Close()
  523. client = ipb.NewIntStoreClient(conn)
  524. _, err = client.Set(ctx, tc.reqMsg)
  525. switch {
  526. case err != nil && !tc.wantErr:
  527. t.Error(err)
  528. case err == nil && tc.wantErr:
  529. t.Errorf("got nil; want error")
  530. }
  531. }()
  532. }
  533. }