You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

743 lines
19 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rpcreplay
  15. import (
  16. "bufio"
  17. "context"
  18. "encoding/binary"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "log"
  23. "net"
  24. "os"
  25. "sync"
  26. pb "cloud.google.com/go/rpcreplay/proto/rpcreplay"
  27. "github.com/golang/protobuf/proto"
  28. "github.com/golang/protobuf/ptypes"
  29. "github.com/golang/protobuf/ptypes/any"
  30. spb "google.golang.org/genproto/googleapis/rpc/status"
  31. "google.golang.org/grpc"
  32. "google.golang.org/grpc/metadata"
  33. "google.golang.org/grpc/status"
  34. )
  35. // A Recorder records RPCs for later playback.
  36. type Recorder struct {
  37. mu sync.Mutex
  38. w *bufio.Writer
  39. f *os.File
  40. next int
  41. err error
  42. // BeforeFunc defines a function that can inspect and modify requests and responses
  43. // written to the replay file. It does not modify messages sent to the service.
  44. // It is run once before a request is written to the replay file, and once before a response
  45. // is written to the replay file.
  46. // The function is called with the method name and the message that triggered the callback.
  47. // If the function returns an error, the error will be returned to the client.
  48. // This is only executed for unary RPCs; streaming RPCs are not supported.
  49. BeforeFunc func(string, proto.Message) error
  50. }
  51. // NewRecorder creates a recorder that writes to filename. The file will
  52. // also store the initial bytes for retrieval during replay.
  53. //
  54. // You must call Close on the Recorder to ensure that all data is written.
  55. func NewRecorder(filename string, initial []byte) (*Recorder, error) {
  56. f, err := os.Create(filename)
  57. if err != nil {
  58. return nil, err
  59. }
  60. rec, err := NewRecorderWriter(f, initial)
  61. if err != nil {
  62. _ = f.Close()
  63. return nil, err
  64. }
  65. rec.f = f
  66. return rec, nil
  67. }
  68. // NewRecorderWriter creates a recorder that writes to w. The initial
  69. // bytes will also be written to w for retrieval during replay.
  70. //
  71. // You must call Close on the Recorder to ensure that all data is written.
  72. func NewRecorderWriter(w io.Writer, initial []byte) (*Recorder, error) {
  73. bw := bufio.NewWriter(w)
  74. if err := writeHeader(bw, initial); err != nil {
  75. return nil, err
  76. }
  77. return &Recorder{w: bw, next: 1}, nil
  78. }
  79. // DialOptions returns the options that must be passed to grpc.Dial
  80. // to enable recording.
  81. func (r *Recorder) DialOptions() []grpc.DialOption {
  82. return []grpc.DialOption{
  83. grpc.WithUnaryInterceptor(r.interceptUnary),
  84. grpc.WithStreamInterceptor(r.interceptStream),
  85. }
  86. }
  87. // Close saves any unwritten information.
  88. func (r *Recorder) Close() error {
  89. r.mu.Lock()
  90. defer r.mu.Unlock()
  91. if r.err != nil {
  92. return r.err
  93. }
  94. err := r.w.Flush()
  95. if r.f != nil {
  96. if err2 := r.f.Close(); err == nil {
  97. err = err2
  98. }
  99. }
  100. return err
  101. }
  102. // Intercepts all unary (non-stream) RPCs.
  103. func (r *Recorder) interceptUnary(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  104. ereq := &entry{
  105. kind: pb.Entry_REQUEST,
  106. method: method,
  107. msg: message{msg: proto.Clone(req.(proto.Message))},
  108. }
  109. if r.BeforeFunc != nil {
  110. if err := r.BeforeFunc(method, ereq.msg.msg); err != nil {
  111. return err
  112. }
  113. }
  114. refIndex, err := r.writeEntry(ereq)
  115. if err != nil {
  116. return err
  117. }
  118. ierr := invoker(ctx, method, req, res, cc, opts...)
  119. eres := &entry{
  120. kind: pb.Entry_RESPONSE,
  121. refIndex: refIndex,
  122. }
  123. // If the error is not a gRPC status, then something more
  124. // serious is wrong. More significantly, we have no way
  125. // of serializing an arbitrary error. So just return it
  126. // without recording the response.
  127. if _, ok := status.FromError(ierr); !ok {
  128. r.mu.Lock()
  129. r.err = fmt.Errorf("saw non-status error in %s response: %v (%T)", method, ierr, ierr)
  130. r.mu.Unlock()
  131. return ierr
  132. }
  133. eres.msg.set(proto.Clone(res.(proto.Message)), ierr)
  134. if r.BeforeFunc != nil {
  135. if err := r.BeforeFunc(method, eres.msg.msg); err != nil {
  136. return err
  137. }
  138. }
  139. if _, err := r.writeEntry(eres); err != nil {
  140. return err
  141. }
  142. return ierr
  143. }
  144. func (r *Recorder) writeEntry(e *entry) (int, error) {
  145. r.mu.Lock()
  146. defer r.mu.Unlock()
  147. if r.err != nil {
  148. return 0, r.err
  149. }
  150. err := writeEntry(r.w, e)
  151. if err != nil {
  152. r.err = err
  153. return 0, err
  154. }
  155. n := r.next
  156. r.next++
  157. return n, nil
  158. }
  159. func (r *Recorder) interceptStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  160. cstream, serr := streamer(ctx, desc, cc, method, opts...)
  161. e := &entry{
  162. kind: pb.Entry_CREATE_STREAM,
  163. method: method,
  164. }
  165. e.msg.set(nil, serr)
  166. refIndex, err := r.writeEntry(e)
  167. if err != nil {
  168. return nil, err
  169. }
  170. return &recClientStream{
  171. ctx: ctx,
  172. rec: r,
  173. cstream: cstream,
  174. refIndex: refIndex,
  175. }, serr
  176. }
  177. // A recClientStream implements the gprc.ClientStream interface.
  178. // It behaves exactly like the default ClientStream, but also
  179. // records all messages sent and received.
  180. type recClientStream struct {
  181. ctx context.Context
  182. rec *Recorder
  183. cstream grpc.ClientStream
  184. refIndex int
  185. }
  186. func (rcs *recClientStream) Context() context.Context { return rcs.ctx }
  187. func (rcs *recClientStream) SendMsg(m interface{}) error {
  188. serr := rcs.cstream.SendMsg(m)
  189. e := &entry{
  190. kind: pb.Entry_SEND,
  191. refIndex: rcs.refIndex,
  192. }
  193. e.msg.set(m, serr)
  194. if _, err := rcs.rec.writeEntry(e); err != nil {
  195. return err
  196. }
  197. return serr
  198. }
  199. func (rcs *recClientStream) RecvMsg(m interface{}) error {
  200. serr := rcs.cstream.RecvMsg(m)
  201. e := &entry{
  202. kind: pb.Entry_RECV,
  203. refIndex: rcs.refIndex,
  204. }
  205. e.msg.set(m, serr)
  206. if _, err := rcs.rec.writeEntry(e); err != nil {
  207. return err
  208. }
  209. return serr
  210. }
  211. func (rcs *recClientStream) Header() (metadata.MD, error) {
  212. // TODO(jba): record.
  213. return rcs.cstream.Header()
  214. }
  215. func (rcs *recClientStream) Trailer() metadata.MD {
  216. // TODO(jba): record.
  217. return rcs.cstream.Trailer()
  218. }
  219. func (rcs *recClientStream) CloseSend() error {
  220. // TODO(jba): record.
  221. return rcs.cstream.CloseSend()
  222. }
  223. // A Replayer replays a set of RPCs saved by a Recorder.
  224. type Replayer struct {
  225. initial []byte // initial state
  226. log func(format string, v ...interface{}) // for debugging
  227. mu sync.Mutex
  228. calls []*call
  229. streams []*stream
  230. // BeforeFunc defines a function that can inspect and modify requests before they
  231. // are matched for responses from the replay file.
  232. // The function is called with the method name and the message that triggered the callback.
  233. // If the function returns an error, the error will be returned to the client.
  234. // This is only executed for unary RPCs; streaming RPCs are not supported.
  235. BeforeFunc func(string, proto.Message) error
  236. }
  237. // A call represents a unary RPC, with a request and response (or error).
  238. type call struct {
  239. method string
  240. request proto.Message
  241. response message
  242. }
  243. // A stream represents a gRPC stream, with an initial create-stream call, followed by
  244. // zero or more sends and/or receives.
  245. type stream struct {
  246. method string
  247. createIndex int
  248. createErr error // error from create call
  249. sends []message
  250. recvs []message
  251. }
  252. // NewReplayer creates a Replayer that reads from filename.
  253. func NewReplayer(filename string) (*Replayer, error) {
  254. f, err := os.Open(filename)
  255. if err != nil {
  256. return nil, err
  257. }
  258. defer f.Close()
  259. return NewReplayerReader(f)
  260. }
  261. // NewReplayerReader creates a Replayer that reads from r.
  262. func NewReplayerReader(r io.Reader) (*Replayer, error) {
  263. rep := &Replayer{
  264. log: func(string, ...interface{}) {},
  265. }
  266. if err := rep.read(r); err != nil {
  267. return nil, err
  268. }
  269. return rep, nil
  270. }
  271. // read reads the stream of recorded entries.
  272. // It matches requests with responses, with each pair grouped
  273. // into a call struct.
  274. func (rep *Replayer) read(r io.Reader) error {
  275. r = bufio.NewReader(r)
  276. bytes, err := readHeader(r)
  277. if err != nil {
  278. return err
  279. }
  280. rep.initial = bytes
  281. callsByIndex := map[int]*call{}
  282. streamsByIndex := map[int]*stream{}
  283. for i := 1; ; i++ {
  284. e, err := readEntry(r)
  285. if err != nil {
  286. return err
  287. }
  288. if e == nil {
  289. break
  290. }
  291. switch e.kind {
  292. case pb.Entry_REQUEST:
  293. callsByIndex[i] = &call{
  294. method: e.method,
  295. request: e.msg.msg,
  296. }
  297. case pb.Entry_RESPONSE:
  298. call := callsByIndex[e.refIndex]
  299. if call == nil {
  300. return fmt.Errorf("replayer: no request for response #%d", i)
  301. }
  302. delete(callsByIndex, e.refIndex)
  303. call.response = e.msg
  304. rep.calls = append(rep.calls, call)
  305. case pb.Entry_CREATE_STREAM:
  306. s := &stream{method: e.method, createIndex: i}
  307. s.createErr = e.msg.err
  308. streamsByIndex[i] = s
  309. rep.streams = append(rep.streams, s)
  310. case pb.Entry_SEND:
  311. s := streamsByIndex[e.refIndex]
  312. if s == nil {
  313. return fmt.Errorf("replayer: no stream for send #%d", i)
  314. }
  315. s.sends = append(s.sends, e.msg)
  316. case pb.Entry_RECV:
  317. s := streamsByIndex[e.refIndex]
  318. if s == nil {
  319. return fmt.Errorf("replayer: no stream for recv #%d", i)
  320. }
  321. s.recvs = append(s.recvs, e.msg)
  322. default:
  323. return fmt.Errorf("replayer: unknown kind %s", e.kind)
  324. }
  325. }
  326. if len(callsByIndex) > 0 {
  327. return fmt.Errorf("replayer: %d unmatched requests", len(callsByIndex))
  328. }
  329. return nil
  330. }
  331. // DialOptions returns the options that must be passed to grpc.Dial
  332. // to enable replaying.
  333. func (rep *Replayer) DialOptions() []grpc.DialOption {
  334. return []grpc.DialOption{
  335. // On replay, we make no RPCs, which means the connection may be closed
  336. // before the normally async Dial completes. Making the Dial synchronous
  337. // fixes that.
  338. grpc.WithBlock(),
  339. grpc.WithUnaryInterceptor(rep.interceptUnary),
  340. grpc.WithStreamInterceptor(rep.interceptStream),
  341. }
  342. }
  343. // Connection returns a fake gRPC connection suitable for replaying.
  344. func (rep *Replayer) Connection() (*grpc.ClientConn, error) {
  345. // We don't need an actual connection, not even a loopback one.
  346. // But we do need something to attach gRPC interceptors to.
  347. // So we start a local server and connect to it, then close it down.
  348. srv := grpc.NewServer()
  349. l, err := net.Listen("tcp", "127.0.0.1:0")
  350. if err != nil {
  351. return nil, err
  352. }
  353. go func() {
  354. if err := srv.Serve(l); err != nil {
  355. panic(err) // we should never get an error because we just connect and stop
  356. }
  357. }()
  358. conn, err := grpc.Dial(l.Addr().String(),
  359. append([]grpc.DialOption{grpc.WithInsecure()}, rep.DialOptions()...)...)
  360. if err != nil {
  361. return nil, err
  362. }
  363. conn.Close()
  364. srv.Stop()
  365. return conn, nil
  366. }
  367. // Initial returns the initial state saved by the Recorder.
  368. func (rep *Replayer) Initial() []byte { return rep.initial }
  369. // SetLogFunc sets a function to be used for debug logging. The function
  370. // should be safe to be called from multiple goroutines.
  371. func (rep *Replayer) SetLogFunc(f func(format string, v ...interface{})) {
  372. rep.log = f
  373. }
  374. // Close closes the Replayer.
  375. func (rep *Replayer) Close() error {
  376. return nil
  377. }
  378. func (rep *Replayer) interceptUnary(_ context.Context, method string, req, res interface{}, _ *grpc.ClientConn, _ grpc.UnaryInvoker, _ ...grpc.CallOption) error {
  379. mreq := req.(proto.Message)
  380. if rep.BeforeFunc != nil {
  381. if err := rep.BeforeFunc(method, mreq); err != nil {
  382. return err
  383. }
  384. }
  385. rep.log("request %s (%s)", method, req)
  386. call := rep.extractCall(method, mreq)
  387. if call == nil {
  388. return fmt.Errorf("replayer: request not found: %s", mreq)
  389. }
  390. rep.log("returning %v", call.response)
  391. if call.response.err != nil {
  392. return call.response.err
  393. }
  394. proto.Merge(res.(proto.Message), call.response.msg) // copy msg into res
  395. return nil
  396. }
  397. func (rep *Replayer) interceptStream(ctx context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, method string, _ grpc.Streamer, _ ...grpc.CallOption) (grpc.ClientStream, error) {
  398. rep.log("create-stream %s", method)
  399. str := rep.extractStream(method)
  400. if str == nil {
  401. return nil, fmt.Errorf("replayer: stream not found for method %s", method)
  402. }
  403. if str.createErr != nil {
  404. return nil, str.createErr
  405. }
  406. return &repClientStream{ctx: ctx, str: str}, nil
  407. }
  408. type repClientStream struct {
  409. ctx context.Context
  410. str *stream
  411. }
  412. func (rcs *repClientStream) Context() context.Context { return rcs.ctx }
  413. func (rcs *repClientStream) SendMsg(m interface{}) error {
  414. if len(rcs.str.sends) == 0 {
  415. return fmt.Errorf("replayer: no more sends for stream %s, created at index %d",
  416. rcs.str.method, rcs.str.createIndex)
  417. }
  418. // TODO(jba): Do not assume that the sends happen in the same order on replay.
  419. msg := rcs.str.sends[0]
  420. rcs.str.sends = rcs.str.sends[1:]
  421. return msg.err
  422. }
  423. func (rcs *repClientStream) RecvMsg(m interface{}) error {
  424. if len(rcs.str.recvs) == 0 {
  425. return fmt.Errorf("replayer: no more receives for stream %s, created at index %d",
  426. rcs.str.method, rcs.str.createIndex)
  427. }
  428. msg := rcs.str.recvs[0]
  429. rcs.str.recvs = rcs.str.recvs[1:]
  430. if msg.err != nil {
  431. return msg.err
  432. }
  433. proto.Merge(m.(proto.Message), msg.msg) // copy msg into m
  434. return nil
  435. }
  436. func (rcs *repClientStream) Header() (metadata.MD, error) {
  437. log.Printf("replay: stream metadata not supported")
  438. return nil, nil
  439. }
  440. func (rcs *repClientStream) Trailer() metadata.MD {
  441. log.Printf("replay: stream metadata not supported")
  442. return nil
  443. }
  444. func (rcs *repClientStream) CloseSend() error {
  445. return nil
  446. }
  447. // extractCall finds the first call in the list with the same method
  448. // and request. It returns nil if it can't find such a call.
  449. func (rep *Replayer) extractCall(method string, req proto.Message) *call {
  450. rep.mu.Lock()
  451. defer rep.mu.Unlock()
  452. for i, call := range rep.calls {
  453. if call == nil {
  454. continue
  455. }
  456. if method == call.method && proto.Equal(req, call.request) {
  457. rep.calls[i] = nil // nil out this call so we don't reuse it
  458. return call
  459. }
  460. }
  461. return nil
  462. }
  463. func (rep *Replayer) extractStream(method string) *stream {
  464. rep.mu.Lock()
  465. defer rep.mu.Unlock()
  466. for i, stream := range rep.streams {
  467. if stream == nil {
  468. continue
  469. }
  470. if method == stream.method {
  471. rep.streams[i] = nil
  472. return stream
  473. }
  474. }
  475. return nil
  476. }
  477. // Fprint reads the entries from filename and writes them to w in human-readable form.
  478. // It is intended for debugging.
  479. func Fprint(w io.Writer, filename string) error {
  480. f, err := os.Open(filename)
  481. if err != nil {
  482. return err
  483. }
  484. defer f.Close()
  485. return FprintReader(w, f)
  486. }
  487. // FprintReader reads the entries from r and writes them to w in human-readable form.
  488. // It is intended for debugging.
  489. func FprintReader(w io.Writer, r io.Reader) error {
  490. initial, err := readHeader(r)
  491. if err != nil {
  492. return err
  493. }
  494. fmt.Fprintf(w, "initial state: %q\n", string(initial))
  495. for i := 1; ; i++ {
  496. e, err := readEntry(r)
  497. if err != nil {
  498. return err
  499. }
  500. if e == nil {
  501. return nil
  502. }
  503. s := "message"
  504. if e.msg.err != nil {
  505. s = "error"
  506. }
  507. fmt.Fprintf(w, "#%d: kind: %s, method: %s, ref index: %d, %s:\n",
  508. i, e.kind, e.method, e.refIndex, s)
  509. if e.msg.err == nil {
  510. if err := proto.MarshalText(w, e.msg.msg); err != nil {
  511. return err
  512. }
  513. } else {
  514. fmt.Fprintf(w, "%v\n", e.msg.err)
  515. }
  516. }
  517. }
  518. // An entry holds one gRPC action (request, response, etc.).
  519. type entry struct {
  520. kind pb.Entry_Kind
  521. method string
  522. msg message
  523. refIndex int // index of corresponding request or create-stream
  524. }
  525. func (e1 *entry) equal(e2 *entry) bool {
  526. if e1 == nil && e2 == nil {
  527. return true
  528. }
  529. if e1 == nil || e2 == nil {
  530. return false
  531. }
  532. return e1.kind == e2.kind &&
  533. e1.method == e2.method &&
  534. proto.Equal(e1.msg.msg, e2.msg.msg) &&
  535. errEqual(e1.msg.err, e2.msg.err) &&
  536. e1.refIndex == e2.refIndex
  537. }
  538. func errEqual(e1, e2 error) bool {
  539. if e1 == e2 {
  540. return true
  541. }
  542. s1, ok1 := status.FromError(e1)
  543. s2, ok2 := status.FromError(e2)
  544. if !ok1 || !ok2 {
  545. return false
  546. }
  547. return proto.Equal(s1.Proto(), s2.Proto())
  548. }
  549. // message holds either a single proto.Message or an error.
  550. type message struct {
  551. msg proto.Message
  552. err error
  553. }
  554. func (m *message) set(msg interface{}, err error) {
  555. m.err = err
  556. if err != io.EOF && msg != nil {
  557. m.msg = msg.(proto.Message)
  558. }
  559. }
  560. // File format:
  561. // header
  562. // sequence of Entry protos
  563. //
  564. // Header format:
  565. // magic string
  566. // a record containing the bytes of the initial state
  567. const magic = "RPCReplay"
  568. func writeHeader(w io.Writer, initial []byte) error {
  569. if _, err := io.WriteString(w, magic); err != nil {
  570. return err
  571. }
  572. return writeRecord(w, initial)
  573. }
  574. func readHeader(r io.Reader) ([]byte, error) {
  575. var buf [len(magic)]byte
  576. if _, err := io.ReadFull(r, buf[:]); err != nil {
  577. if err == io.EOF {
  578. err = errors.New("rpcreplay: empty replay file")
  579. }
  580. return nil, err
  581. }
  582. if string(buf[:]) != magic {
  583. return nil, errors.New("rpcreplay: not a replay file (does not begin with magic string)")
  584. }
  585. bytes, err := readRecord(r)
  586. if err == io.EOF {
  587. err = errors.New("rpcreplay: missing initial state")
  588. }
  589. return bytes, err
  590. }
  591. func writeEntry(w io.Writer, e *entry) error {
  592. var m proto.Message
  593. if e.msg.err != nil && e.msg.err != io.EOF {
  594. s, ok := status.FromError(e.msg.err)
  595. if !ok {
  596. return fmt.Errorf("rpcreplay: error %v is not a Status", e.msg.err)
  597. }
  598. m = s.Proto()
  599. } else {
  600. m = e.msg.msg
  601. }
  602. var a *any.Any
  603. var err error
  604. if m != nil {
  605. a, err = ptypes.MarshalAny(m)
  606. if err != nil {
  607. return err
  608. }
  609. }
  610. pe := &pb.Entry{
  611. Kind: e.kind,
  612. Method: e.method,
  613. Message: a,
  614. IsError: e.msg.err != nil,
  615. RefIndex: int32(e.refIndex),
  616. }
  617. bytes, err := proto.Marshal(pe)
  618. if err != nil {
  619. return err
  620. }
  621. return writeRecord(w, bytes)
  622. }
  623. func readEntry(r io.Reader) (*entry, error) {
  624. buf, err := readRecord(r)
  625. if err == io.EOF {
  626. return nil, nil
  627. }
  628. if err != nil {
  629. return nil, err
  630. }
  631. var pe pb.Entry
  632. if err := proto.Unmarshal(buf, &pe); err != nil {
  633. return nil, err
  634. }
  635. var msg message
  636. if pe.Message != nil {
  637. var any ptypes.DynamicAny
  638. if err := ptypes.UnmarshalAny(pe.Message, &any); err != nil {
  639. return nil, err
  640. }
  641. if pe.IsError {
  642. msg.err = status.ErrorProto(any.Message.(*spb.Status))
  643. } else {
  644. msg.msg = any.Message
  645. }
  646. } else if pe.IsError {
  647. msg.err = io.EOF
  648. } else if pe.Kind != pb.Entry_CREATE_STREAM {
  649. return nil, errors.New("rpcreplay: entry with nil message and false is_error")
  650. }
  651. return &entry{
  652. kind: pe.Kind,
  653. method: pe.Method,
  654. msg: msg,
  655. refIndex: int(pe.RefIndex),
  656. }, nil
  657. }
  658. // A record consists of an unsigned 32-bit little-endian length L followed by L
  659. // bytes.
  660. func writeRecord(w io.Writer, data []byte) error {
  661. if err := binary.Write(w, binary.LittleEndian, uint32(len(data))); err != nil {
  662. return err
  663. }
  664. _, err := w.Write(data)
  665. return err
  666. }
  667. func readRecord(r io.Reader) ([]byte, error) {
  668. var size uint32
  669. if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
  670. return nil, err
  671. }
  672. buf := make([]byte, size)
  673. if _, err := io.ReadFull(r, buf); err != nil {
  674. return nil, err
  675. }
  676. return buf, nil
  677. }