You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

632 lines
17 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // TODO(jba): test that OnError is getting called appropriately.
  15. package logging_test
  16. import (
  17. "context"
  18. "flag"
  19. "fmt"
  20. "log"
  21. "math/rand"
  22. "os"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. cinternal "cloud.google.com/go/internal"
  29. "cloud.google.com/go/internal/testutil"
  30. "cloud.google.com/go/internal/uid"
  31. "cloud.google.com/go/logging"
  32. ltesting "cloud.google.com/go/logging/internal/testing"
  33. "cloud.google.com/go/logging/logadmin"
  34. gax "github.com/googleapis/gax-go/v2"
  35. "golang.org/x/oauth2"
  36. "google.golang.org/api/iterator"
  37. "google.golang.org/api/option"
  38. mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
  39. "google.golang.org/grpc"
  40. "google.golang.org/grpc/codes"
  41. "google.golang.org/grpc/status"
  42. )
  43. const testLogIDPrefix = "GO-LOGGING-CLIENT/TEST-LOG"
  44. var (
  45. client *logging.Client
  46. aclient *logadmin.Client
  47. testProjectID string
  48. testLogID string
  49. testFilter string
  50. errorc chan error
  51. ctx context.Context
  52. // Adjust the fields of a FullEntry received from the production service
  53. // before comparing it with the expected result. We can't correctly
  54. // compare certain fields, like times or server-generated IDs.
  55. clean func(*logging.Entry)
  56. // Create a new client with the given project ID.
  57. newClients func(ctx context.Context, projectID string) (*logging.Client, *logadmin.Client)
  58. uids = uid.NewSpace(testLogIDPrefix, nil)
  59. // If true, this test is using the production service, not a fake.
  60. integrationTest bool
  61. )
  62. func testNow() time.Time {
  63. return time.Unix(1000, 0)
  64. }
  65. func TestMain(m *testing.M) {
  66. flag.Parse() // needed for testing.Short()
  67. ctx = context.Background()
  68. testProjectID = testutil.ProjID()
  69. errorc = make(chan error, 100)
  70. if testProjectID == "" || testing.Short() {
  71. integrationTest = false
  72. if testProjectID != "" {
  73. log.Print("Integration tests skipped in short mode (using fake instead)")
  74. }
  75. testProjectID = ltesting.ValidProjectID
  76. clean = func(e *logging.Entry) {
  77. // Remove the insert ID for consistency with the integration test.
  78. e.InsertID = ""
  79. }
  80. addr, err := ltesting.NewServer()
  81. if err != nil {
  82. log.Fatalf("creating fake server: %v", err)
  83. }
  84. logging.SetNow(testNow)
  85. newClients = func(ctx context.Context, parent string) (*logging.Client, *logadmin.Client) {
  86. conn, err := grpc.Dial(addr, grpc.WithInsecure())
  87. if err != nil {
  88. log.Fatalf("dialing %q: %v", addr, err)
  89. }
  90. c, err := logging.NewClient(ctx, parent, option.WithGRPCConn(conn))
  91. if err != nil {
  92. log.Fatalf("creating client for fake at %q: %v", addr, err)
  93. }
  94. ac, err := logadmin.NewClient(ctx, parent, option.WithGRPCConn(conn))
  95. if err != nil {
  96. log.Fatalf("creating client for fake at %q: %v", addr, err)
  97. }
  98. return c, ac
  99. }
  100. } else {
  101. integrationTest = true
  102. clean = func(e *logging.Entry) {
  103. // We cannot compare timestamps, so set them to the test time.
  104. // Also, remove the insert ID added by the service.
  105. e.Timestamp = testNow().UTC()
  106. e.InsertID = ""
  107. }
  108. ts := testutil.TokenSource(ctx, logging.AdminScope)
  109. if ts == nil {
  110. log.Fatal("The project key must be set. See CONTRIBUTING.md for details")
  111. }
  112. log.Printf("running integration tests with project %s", testProjectID)
  113. newClients = func(ctx context.Context, parent string) (*logging.Client, *logadmin.Client) {
  114. c, err := logging.NewClient(ctx, parent, option.WithTokenSource(ts))
  115. if err != nil {
  116. log.Fatalf("creating prod client: %v", err)
  117. }
  118. ac, err := logadmin.NewClient(ctx, parent, option.WithTokenSource(ts))
  119. if err != nil {
  120. log.Fatalf("creating prod client: %v", err)
  121. }
  122. return c, ac
  123. }
  124. }
  125. client, aclient = newClients(ctx, testProjectID)
  126. client.OnError = func(e error) { errorc <- e }
  127. exit := m.Run()
  128. os.Exit(exit)
  129. }
  130. func initLogs() {
  131. testLogID = uids.New()
  132. hourAgo := time.Now().Add(-1 * time.Hour).UTC()
  133. testFilter = fmt.Sprintf(`logName = "projects/%s/logs/%s" AND
  134. timestamp >= "%s"`,
  135. testProjectID, strings.Replace(testLogID, "/", "%2F", -1), hourAgo.Format(time.RFC3339))
  136. }
  137. func TestLogSync(t *testing.T) {
  138. // TODO(deklerk) Un-flake and re-enable
  139. t.Skip("Inherently flaky")
  140. initLogs() // Generate new testLogID
  141. ctx := context.Background()
  142. lg := client.Logger(testLogID)
  143. err := lg.LogSync(ctx, logging.Entry{Payload: "hello"})
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. err = lg.LogSync(ctx, logging.Entry{Payload: "goodbye"})
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. // Allow overriding the MonitoredResource.
  152. err = lg.LogSync(ctx, logging.Entry{Payload: "mr", Resource: &mrpb.MonitoredResource{Type: "global"}})
  153. if err != nil {
  154. t.Fatal(err)
  155. }
  156. want := []*logging.Entry{
  157. entryForTesting("hello"),
  158. entryForTesting("goodbye"),
  159. entryForTesting("mr"),
  160. }
  161. var got []*logging.Entry
  162. ok := waitFor(func() bool {
  163. got, err = allTestLogEntries(ctx)
  164. if err != nil {
  165. t.Log("fetching log entries: ", err)
  166. return false
  167. }
  168. return len(got) == len(want)
  169. })
  170. if !ok {
  171. t.Fatalf("timed out; got: %d, want: %d\n", len(got), len(want))
  172. }
  173. if msg, ok := compareEntries(got, want); !ok {
  174. t.Error(msg)
  175. }
  176. }
  177. func TestLogAndEntries(t *testing.T) {
  178. // TODO(deklerk) Un-flake and re-enable
  179. t.Skip("Inherently flaky")
  180. initLogs() // Generate new testLogID
  181. ctx := context.Background()
  182. payloads := []string{"p1", "p2", "p3", "p4", "p5"}
  183. lg := client.Logger(testLogID)
  184. for _, p := range payloads {
  185. // Use the insert ID to guarantee iteration order.
  186. lg.Log(logging.Entry{Payload: p, InsertID: p})
  187. }
  188. if err := lg.Flush(); err != nil {
  189. t.Fatal(err)
  190. }
  191. var want []*logging.Entry
  192. for _, p := range payloads {
  193. want = append(want, entryForTesting(p))
  194. }
  195. var got []*logging.Entry
  196. ok := waitFor(func() bool {
  197. var err error
  198. got, err = allTestLogEntries(ctx)
  199. if err != nil {
  200. t.Log("fetching log entries: ", err)
  201. return false
  202. }
  203. return len(got) == len(want)
  204. })
  205. if !ok {
  206. t.Fatalf("timed out; got: %d, want: %d\n", len(got), len(want))
  207. }
  208. if msg, ok := compareEntries(got, want); !ok {
  209. t.Error(msg)
  210. }
  211. }
  212. func TestContextFunc(t *testing.T) {
  213. initLogs()
  214. var contextFuncCalls, cleanupCalls int32 //atomic
  215. lg := client.Logger(testLogID, logging.ContextFunc(func() (context.Context, func()) {
  216. atomic.AddInt32(&contextFuncCalls, 1)
  217. return context.Background(), func() { atomic.AddInt32(&cleanupCalls, 1) }
  218. }))
  219. lg.Log(logging.Entry{Payload: "p"})
  220. if err := lg.Flush(); err != nil {
  221. t.Fatal(err)
  222. }
  223. got1 := atomic.LoadInt32(&contextFuncCalls)
  224. got2 := atomic.LoadInt32(&cleanupCalls)
  225. if got1 != 1 || got1 != got2 {
  226. t.Errorf("got %d calls to context func, %d calls to cleanup func; want 1, 1", got1, got2)
  227. }
  228. }
  229. // compareEntries compares most fields list of Entries against expected. compareEntries does not compare:
  230. // - HTTPRequest
  231. // - Operation
  232. // - Resource
  233. // - SourceLocation
  234. func compareEntries(got, want []*logging.Entry) (string, bool) {
  235. if len(got) != len(want) {
  236. return fmt.Sprintf("got %d entries, want %d", len(got), len(want)), false
  237. }
  238. for i := range got {
  239. if !compareEntry(got[i], want[i]) {
  240. return fmt.Sprintf("#%d:\ngot %+v\nwant %+v", i, got[i], want[i]), false
  241. }
  242. }
  243. return "", true
  244. }
  245. func compareEntry(got, want *logging.Entry) bool {
  246. if got.Timestamp.Unix() != want.Timestamp.Unix() {
  247. return false
  248. }
  249. if got.Severity != want.Severity {
  250. return false
  251. }
  252. if !ltesting.PayloadEqual(got.Payload, want.Payload) {
  253. return false
  254. }
  255. if !testutil.Equal(got.Labels, want.Labels) {
  256. return false
  257. }
  258. if got.InsertID != want.InsertID {
  259. return false
  260. }
  261. if got.LogName != want.LogName {
  262. return false
  263. }
  264. return true
  265. }
  266. func entryForTesting(payload interface{}) *logging.Entry {
  267. return &logging.Entry{
  268. Timestamp: testNow().UTC(),
  269. Payload: payload,
  270. LogName: "projects/" + testProjectID + "/logs/" + testLogID,
  271. Resource: &mrpb.MonitoredResource{Type: "global", Labels: map[string]string{"project_id": testProjectID}},
  272. }
  273. }
  274. func allTestLogEntries(ctx context.Context) ([]*logging.Entry, error) {
  275. return allEntries(ctx, aclient, testFilter)
  276. }
  277. func allEntries(ctx context.Context, aclient *logadmin.Client, filter string) ([]*logging.Entry, error) {
  278. var es []*logging.Entry
  279. it := aclient.Entries(ctx, logadmin.Filter(filter))
  280. for {
  281. e, err := cleanNext(it)
  282. switch err {
  283. case nil:
  284. es = append(es, e)
  285. case iterator.Done:
  286. return es, nil
  287. default:
  288. return nil, err
  289. }
  290. }
  291. }
  292. func cleanNext(it *logadmin.EntryIterator) (*logging.Entry, error) {
  293. e, err := it.Next()
  294. if err != nil {
  295. return nil, err
  296. }
  297. clean(e)
  298. return e, nil
  299. }
  300. func TestStandardLogger(t *testing.T) {
  301. // TODO(deklerk) Un-flake and re-enable
  302. t.Skip("Inherently flaky")
  303. initLogs() // Generate new testLogID
  304. ctx := context.Background()
  305. lg := client.Logger(testLogID)
  306. slg := lg.StandardLogger(logging.Info)
  307. if slg != lg.StandardLogger(logging.Info) {
  308. t.Error("There should be only one standard logger at each severity.")
  309. }
  310. if slg == lg.StandardLogger(logging.Debug) {
  311. t.Error("There should be a different standard logger for each severity.")
  312. }
  313. slg.Print("info")
  314. if err := lg.Flush(); err != nil {
  315. t.Fatal(err)
  316. }
  317. var got []*logging.Entry
  318. ok := waitFor(func() bool {
  319. var err error
  320. got, err = allTestLogEntries(ctx)
  321. if err != nil {
  322. t.Log("fetching log entries: ", err)
  323. return false
  324. }
  325. return len(got) == 1
  326. })
  327. if !ok {
  328. t.Fatalf("timed out; got: %d, want: %d\n", len(got), 1)
  329. }
  330. if len(got) != 1 {
  331. t.Fatalf("expected non-nil request with one entry; got:\n%+v", got)
  332. }
  333. if got, want := got[0].Payload.(string), "info\n"; got != want {
  334. t.Errorf("payload: got %q, want %q", got, want)
  335. }
  336. if got, want := logging.Severity(got[0].Severity), logging.Info; got != want {
  337. t.Errorf("severity: got %s, want %s", got, want)
  338. }
  339. }
  340. func TestSeverity(t *testing.T) {
  341. if got, want := logging.Info.String(), "Info"; got != want {
  342. t.Errorf("got %q, want %q", got, want)
  343. }
  344. if got, want := logging.Severity(-99).String(), "-99"; got != want {
  345. t.Errorf("got %q, want %q", got, want)
  346. }
  347. }
  348. func TestParseSeverity(t *testing.T) {
  349. for _, test := range []struct {
  350. in string
  351. want logging.Severity
  352. }{
  353. {"", logging.Default},
  354. {"whatever", logging.Default},
  355. {"Default", logging.Default},
  356. {"ERROR", logging.Error},
  357. {"Error", logging.Error},
  358. {"error", logging.Error},
  359. } {
  360. got := logging.ParseSeverity(test.in)
  361. if got != test.want {
  362. t.Errorf("%q: got %s, want %s\n", test.in, got, test.want)
  363. }
  364. }
  365. }
  366. func TestErrors(t *testing.T) {
  367. initLogs() // Generate new testLogID
  368. // Drain errors already seen.
  369. loop:
  370. for {
  371. select {
  372. case <-errorc:
  373. default:
  374. break loop
  375. }
  376. }
  377. // Try to log something that can't be JSON-marshalled.
  378. lg := client.Logger(testLogID)
  379. lg.Log(logging.Entry{Payload: func() {}})
  380. // Expect an error from Flush.
  381. err := lg.Flush()
  382. if err == nil {
  383. t.Fatal("expected error, got nil")
  384. }
  385. }
  386. type badTokenSource struct{}
  387. func (badTokenSource) Token() (*oauth2.Token, error) {
  388. return &oauth2.Token{}, nil
  389. }
  390. func TestPing(t *testing.T) {
  391. // Ping twice, in case the service's InsertID logic messes with the error code.
  392. ctx := context.Background()
  393. // The global client should be valid.
  394. if err := client.Ping(ctx); err != nil {
  395. t.Errorf("project %s: got %v, expected nil", testProjectID, err)
  396. }
  397. if err := client.Ping(ctx); err != nil {
  398. t.Errorf("project %s, #2: got %v, expected nil", testProjectID, err)
  399. }
  400. // nonexistent project
  401. c, a := newClients(ctx, testProjectID+"-BAD")
  402. defer c.Close()
  403. defer a.Close()
  404. if err := c.Ping(ctx); err == nil {
  405. t.Errorf("nonexistent project: want error pinging logging api, got nil")
  406. }
  407. if err := c.Ping(ctx); err == nil {
  408. t.Errorf("nonexistent project, #2: want error pinging logging api, got nil")
  409. }
  410. // Bad creds. We cannot test this with the fake, since it doesn't do auth.
  411. if integrationTest {
  412. c, err := logging.NewClient(ctx, testProjectID, option.WithTokenSource(badTokenSource{}))
  413. if err != nil {
  414. t.Fatal(err)
  415. }
  416. if err := c.Ping(ctx); err == nil {
  417. t.Errorf("bad creds: want error pinging logging api, got nil")
  418. }
  419. if err := c.Ping(ctx); err == nil {
  420. t.Errorf("bad creds, #2: want error pinging logging api, got nil")
  421. }
  422. if err := c.Close(); err != nil {
  423. t.Fatalf("error closing client: %v", err)
  424. }
  425. }
  426. }
  427. func TestLogsAndDelete(t *testing.T) {
  428. // This function tests both the Logs and DeleteLog methods. We only try to
  429. // delete those logs that we can observe and that were generated by this
  430. // test. This may not include the logs generated from the current test run,
  431. // because the logging service is only eventually consistent. It's
  432. // therefore possible that on some runs, this test will do nothing.
  433. ctx := context.Background()
  434. it := aclient.Logs(ctx)
  435. nDeleted := 0
  436. for {
  437. logID, err := it.Next()
  438. if err == iterator.Done {
  439. break
  440. }
  441. if err != nil {
  442. t.Fatal(err)
  443. }
  444. if strings.HasPrefix(logID, testLogIDPrefix) {
  445. if err := aclient.DeleteLog(ctx, logID); err != nil {
  446. // Ignore NotFound. Sometimes, amazingly, DeleteLog cannot find
  447. // a log that is returned by Logs.
  448. if status.Code(err) != codes.NotFound {
  449. t.Fatalf("deleting %q: %v", logID, err)
  450. }
  451. } else {
  452. nDeleted++
  453. }
  454. }
  455. }
  456. t.Logf("deleted %d logs", nDeleted)
  457. }
  458. func TestNonProjectParent(t *testing.T) {
  459. ctx := context.Background()
  460. initLogs()
  461. parent := "organizations/" + ltesting.ValidOrgID
  462. c, a := newClients(ctx, parent)
  463. defer c.Close()
  464. defer a.Close()
  465. lg := c.Logger(testLogID)
  466. err := lg.LogSync(ctx, logging.Entry{Payload: "hello"})
  467. if integrationTest {
  468. // We don't have permission to log to the organization.
  469. if got, want := status.Code(err), codes.PermissionDenied; got != want {
  470. t.Errorf("got code %s, want %s", got, want)
  471. }
  472. return
  473. }
  474. // Continue test against fake.
  475. if err != nil {
  476. t.Fatal(err)
  477. }
  478. want := []*logging.Entry{{
  479. Timestamp: testNow().UTC(),
  480. Payload: "hello",
  481. LogName: parent + "/logs/" + testLogID,
  482. Resource: &mrpb.MonitoredResource{
  483. Type: "organization",
  484. Labels: map[string]string{"organization_id": ltesting.ValidOrgID},
  485. },
  486. }}
  487. var got []*logging.Entry
  488. ok := waitFor(func() bool {
  489. got, err = allEntries(ctx, a, fmt.Sprintf(`logName = "%s/logs/%s"`, parent,
  490. strings.Replace(testLogID, "/", "%2F", -1)))
  491. if err != nil {
  492. t.Log("fetching log entries: ", err)
  493. return false
  494. }
  495. return len(got) == len(want)
  496. })
  497. if !ok {
  498. t.Fatalf("timed out; got: %d, want: %d\n", len(got), len(want))
  499. }
  500. if msg, ok := compareEntries(got, want); !ok {
  501. t.Error(msg)
  502. }
  503. }
  504. // waitFor calls f repeatedly with exponential backoff, blocking until it returns true.
  505. // It returns false after a while (if it times out).
  506. func waitFor(f func() bool) bool {
  507. // TODO(shadams): Find a better way to deflake these tests.
  508. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
  509. defer cancel()
  510. err := cinternal.Retry(ctx,
  511. gax.Backoff{Initial: time.Second, Multiplier: 2, Max: 30 * time.Second},
  512. func() (bool, error) { return f(), nil })
  513. return err == nil
  514. }
  515. // Interleave a lot of Log and Flush calls, to induce race conditions.
  516. // Run this test with:
  517. // go test -run LogFlushRace -race -count 100
  518. func TestLogFlushRace(t *testing.T) {
  519. initLogs() // Generate new testLogID
  520. lg := client.Logger(testLogID,
  521. logging.ConcurrentWriteLimit(5), // up to 5 concurrent log writes
  522. logging.EntryCountThreshold(100)) // small bundle size to increase interleaving
  523. var wgf, wgl sync.WaitGroup
  524. donec := make(chan struct{})
  525. for i := 0; i < 10; i++ {
  526. wgl.Add(1)
  527. go func() {
  528. defer wgl.Done()
  529. for j := 0; j < 1e4; j++ {
  530. lg.Log(logging.Entry{Payload: "the payload"})
  531. }
  532. }()
  533. }
  534. for i := 0; i < 5; i++ {
  535. wgf.Add(1)
  536. go func() {
  537. defer wgf.Done()
  538. for {
  539. select {
  540. case <-donec:
  541. return
  542. case <-time.After(time.Duration(rand.Intn(5)) * time.Millisecond):
  543. if err := lg.Flush(); err != nil {
  544. t.Error(err)
  545. }
  546. }
  547. }
  548. }()
  549. }
  550. wgl.Wait()
  551. close(donec)
  552. wgf.Wait()
  553. }
  554. // Test the throughput of concurrent writers.
  555. func BenchmarkConcurrentWrites(b *testing.B) {
  556. if !integrationTest {
  557. b.Skip("only makes sense when running against production service")
  558. }
  559. for n := 1; n <= 32; n *= 2 {
  560. b.Run(fmt.Sprint(n), func(b *testing.B) {
  561. b.StopTimer()
  562. lg := client.Logger(testLogID, logging.ConcurrentWriteLimit(n), logging.EntryCountThreshold(1000))
  563. const (
  564. nEntries = 1e5
  565. payload = "the quick brown fox jumps over the lazy dog"
  566. )
  567. b.SetBytes(int64(nEntries * len(payload)))
  568. b.StartTimer()
  569. for i := 0; i < b.N; i++ {
  570. for j := 0; j < nEntries; j++ {
  571. lg.Log(logging.Entry{Payload: payload})
  572. }
  573. if err := lg.Flush(); err != nil {
  574. b.Fatal(err)
  575. }
  576. }
  577. })
  578. }
  579. }