Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 

564 rindas
18 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. // Package proftest contains test helpers for profiler agent integration tests.
  16. // This package is experimental.
  17. package proftest
  18. import (
  19. "archive/zip"
  20. "bytes"
  21. "context"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "io/ioutil"
  26. "log"
  27. "net/http"
  28. "strings"
  29. "time"
  30. "cloud.google.com/go/storage"
  31. gax "github.com/googleapis/gax-go/v2"
  32. "golang.org/x/build/kubernetes"
  33. k8sapi "golang.org/x/build/kubernetes/api"
  34. "golang.org/x/build/kubernetes/gke"
  35. cloudbuild "google.golang.org/api/cloudbuild/v1"
  36. compute "google.golang.org/api/compute/v1"
  37. container "google.golang.org/api/container/v1"
  38. "google.golang.org/api/googleapi"
  39. )
  40. const (
  41. monitorWriteScope = "https://www.googleapis.com/auth/monitoring.write"
  42. )
  43. // TestRunner has common elements used for testing profiling agents on a range
  44. // of environments.
  45. type TestRunner struct {
  46. Client *http.Client
  47. }
  48. // GCETestRunner supports testing a profiling agent on GCE.
  49. type GCETestRunner struct {
  50. TestRunner
  51. ComputeService *compute.Service
  52. }
  53. // GKETestRunner supports testing a profiling agent on GKE.
  54. type GKETestRunner struct {
  55. TestRunner
  56. ContainerService *container.Service
  57. StorageClient *storage.Client
  58. Dockerfile string
  59. }
  60. // ProfileResponse contains the response produced when querying profile server.
  61. type ProfileResponse struct {
  62. Profile ProfileData `json:"profile"`
  63. NumProfiles int32 `json:"numProfiles"`
  64. Deployments []interface{} `json:"deployments"`
  65. }
  66. // ProfileData has data of a single profile.
  67. type ProfileData struct {
  68. Samples []int32 `json:"samples"`
  69. SampleMetrics interface{} `json:"sampleMetrics"`
  70. DefaultMetricType string `json:"defaultMetricType"`
  71. TreeNodes interface{} `json:"treeNodes"`
  72. Functions functionArray `json:"functions"`
  73. SourceFiles sourceFileArray `json:"sourceFiles"`
  74. }
  75. type functionArray struct {
  76. Name []string `json:"name"`
  77. Sourcefile []int32 `json:"sourceFile"`
  78. }
  79. type sourceFileArray struct {
  80. Name []string `json:"name"`
  81. }
  82. // InstanceConfig is configuration for starting single GCE instance for
  83. // profiling agent test case.
  84. type InstanceConfig struct {
  85. ProjectID string
  86. Zone string
  87. Name string
  88. StartupScript string
  89. MachineType string
  90. }
  91. // ClusterConfig is configuration for starting single GKE cluster for profiling
  92. // agent test case.
  93. type ClusterConfig struct {
  94. ProjectID string
  95. Zone string
  96. ClusterName string
  97. PodName string
  98. ImageSourceName string
  99. ImageName string
  100. Bucket string
  101. Dockerfile string
  102. }
  103. // CheckNonEmpty returns nil if the profile has a profiles and deployments
  104. // associated. Otherwise, returns a desciptive error.
  105. func (pr *ProfileResponse) CheckNonEmpty() error {
  106. if pr.NumProfiles == 0 {
  107. return fmt.Errorf("profile response contains zero profiles: %v", pr)
  108. }
  109. if len(pr.Deployments) == 0 {
  110. return fmt.Errorf("profile response contains zero deployments: %v", pr)
  111. }
  112. return nil
  113. }
  114. // HasFunction returns nil if the function is present, or, if the function is
  115. // not present, and error providing more details why the function is not
  116. // present.
  117. func (pr *ProfileResponse) HasFunction(functionName string) error {
  118. if err := pr.CheckNonEmpty(); err != nil {
  119. return fmt.Errorf("failed to find function name %s in profile: %v", functionName, err)
  120. }
  121. for _, name := range pr.Profile.Functions.Name {
  122. if strings.Contains(name, functionName) {
  123. return nil
  124. }
  125. }
  126. return fmt.Errorf("failed to find function name %s in profile", functionName)
  127. }
  128. // HasFunctionInFile returns nil if function is present in the specifed file, and an
  129. // error if the function/file combination is not present in the profile.
  130. func (pr *ProfileResponse) HasFunctionInFile(functionName string, filename string) error {
  131. if err := pr.CheckNonEmpty(); err != nil {
  132. return fmt.Errorf("failed to find function name %s in file %s in profile: %v", functionName, filename, err)
  133. }
  134. for i, name := range pr.Profile.Functions.Name {
  135. file := pr.Profile.SourceFiles.Name[pr.Profile.Functions.Sourcefile[i]]
  136. if strings.Contains(name, functionName) && strings.HasSuffix(file, filename) {
  137. return nil
  138. }
  139. }
  140. return fmt.Errorf("failed to find function name %s in file %s in profile", functionName, filename)
  141. }
  142. // HasSourceFile returns nil if the file (or file where the end of the file path
  143. // matches the filename) is present in the profile. Or, if the filename is not
  144. // present, an error is returned.
  145. func (pr *ProfileResponse) HasSourceFile(filename string) error {
  146. if err := pr.CheckNonEmpty(); err != nil {
  147. return fmt.Errorf("failed to find filename %s in profile: %v", filename, err)
  148. }
  149. for _, name := range pr.Profile.SourceFiles.Name {
  150. if strings.HasSuffix(name, filename) {
  151. return nil
  152. }
  153. }
  154. return fmt.Errorf("failed to find filename %s in profile", filename)
  155. }
  156. // StartInstance starts a GCE Instance with name, zone, and projectId specified
  157. // by the inst, and which runs the startup script specified in inst.
  158. func (tr *GCETestRunner) StartInstance(ctx context.Context, inst *InstanceConfig) error {
  159. img, err := tr.ComputeService.Images.GetFromFamily("debian-cloud", "debian-9").Context(ctx).Do()
  160. if err != nil {
  161. return err
  162. }
  163. op, err := tr.ComputeService.Instances.Insert(inst.ProjectID, inst.Zone, &compute.Instance{
  164. MachineType: fmt.Sprintf("zones/%s/machineTypes/%s", inst.Zone, inst.MachineType),
  165. Name: inst.Name,
  166. Disks: []*compute.AttachedDisk{{
  167. AutoDelete: true, // delete the disk when the VM is deleted.
  168. Boot: true,
  169. Type: "PERSISTENT",
  170. Mode: "READ_WRITE",
  171. InitializeParams: &compute.AttachedDiskInitializeParams{
  172. SourceImage: img.SelfLink,
  173. DiskType: fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/diskTypes/pd-standard", inst.ProjectID, inst.Zone),
  174. },
  175. }},
  176. NetworkInterfaces: []*compute.NetworkInterface{{
  177. Network: fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/global/networks/default", inst.ProjectID),
  178. AccessConfigs: []*compute.AccessConfig{{
  179. Name: "External NAT",
  180. }},
  181. }},
  182. Metadata: &compute.Metadata{
  183. Items: []*compute.MetadataItems{{
  184. Key: "startup-script",
  185. Value: googleapi.String(inst.StartupScript),
  186. }},
  187. },
  188. ServiceAccounts: []*compute.ServiceAccount{{
  189. Email: "default",
  190. Scopes: []string{
  191. monitorWriteScope,
  192. },
  193. }},
  194. }).Do()
  195. if err != nil {
  196. return fmt.Errorf("failed to create instance: %v", err)
  197. }
  198. // Poll status of the operation to create the instance.
  199. getOpCall := tr.ComputeService.ZoneOperations.Get(inst.ProjectID, inst.Zone, op.Name)
  200. for {
  201. if err := checkOpErrors(op); err != nil {
  202. return fmt.Errorf("failed to create instance: %v", err)
  203. }
  204. if op.Status == "DONE" {
  205. return nil
  206. }
  207. if err := gax.Sleep(ctx, 5*time.Second); err != nil {
  208. return err
  209. }
  210. op, err = getOpCall.Do()
  211. if err != nil {
  212. return fmt.Errorf("failed to get operation: %v", err)
  213. }
  214. }
  215. }
  216. // checkOpErrors returns nil if the operation does not have any errors and an
  217. // error summarizing all errors encountered if the operation has errored.
  218. func checkOpErrors(op *compute.Operation) error {
  219. if op.Error == nil || len(op.Error.Errors) == 0 {
  220. return nil
  221. }
  222. var errs []string
  223. for _, e := range op.Error.Errors {
  224. if e.Message != "" {
  225. errs = append(errs, e.Message)
  226. } else {
  227. errs = append(errs, e.Code)
  228. }
  229. }
  230. return errors.New(strings.Join(errs, ","))
  231. }
  232. // DeleteInstance deletes an instance with project id, name, and zone matched
  233. // by inst.
  234. func (tr *GCETestRunner) DeleteInstance(ctx context.Context, inst *InstanceConfig) error {
  235. if _, err := tr.ComputeService.Instances.Delete(inst.ProjectID, inst.Zone, inst.Name).Context(ctx).Do(); err != nil {
  236. return fmt.Errorf("Instances.Delete(%s) got error: %v", inst.Name, err)
  237. }
  238. return nil
  239. }
  240. // PollForSerialOutput polls serial port 2 of the GCE instance specified by
  241. // inst and returns when the finishString appears in the serial output
  242. // of the instance, or when the context times out.
  243. func (tr *GCETestRunner) PollForSerialOutput(ctx context.Context, inst *InstanceConfig, finishString, errorString string) error {
  244. var output string
  245. defer func() {
  246. log.Printf("Serial port output for %s:\n%s", inst.Name, output)
  247. }()
  248. for {
  249. select {
  250. case <-ctx.Done():
  251. return ctx.Err()
  252. case <-time.After(20 * time.Second):
  253. resp, err := tr.ComputeService.Instances.GetSerialPortOutput(inst.ProjectID, inst.Zone, inst.Name).Port(2).Context(ctx).Do()
  254. if err != nil {
  255. // Transient failure.
  256. log.Printf("Transient error getting serial port output from instance %s (will retry): %v", inst.Name, err)
  257. continue
  258. }
  259. if resp.Contents == "" {
  260. log.Printf("Ignoring empty serial port output from instance %s (will retry)", inst.Name)
  261. continue
  262. }
  263. if output = resp.Contents; strings.Contains(output, finishString) {
  264. return nil
  265. }
  266. if strings.Contains(output, errorString) {
  267. return fmt.Errorf("failed to execute the prober benchmark script")
  268. }
  269. }
  270. }
  271. }
  272. // QueryProfiles retrieves profiles of a specific type, from a specific time
  273. // range, associated with a particular service and project.
  274. func (tr *TestRunner) QueryProfiles(projectID, service, startTime, endTime, profileType string) (ProfileResponse, error) {
  275. queryURL := fmt.Sprintf("https://cloudprofiler.googleapis.com/v2/projects/%s/profiles:query", projectID)
  276. const queryJSONFmt = `{"endTime": "%s", "profileType": "%s","startTime": "%s", "target": "%s"}`
  277. queryRequest := fmt.Sprintf(queryJSONFmt, endTime, profileType, startTime, service)
  278. req, err := http.NewRequest("POST", queryURL, strings.NewReader(queryRequest))
  279. if err != nil {
  280. return ProfileResponse{}, fmt.Errorf("failed to create an API request: %v", err)
  281. }
  282. req.Header = map[string][]string{
  283. "X-Goog-User-Project": {projectID},
  284. }
  285. resp, err := tr.Client.Do(req)
  286. if err != nil {
  287. return ProfileResponse{}, fmt.Errorf("failed to query API: %v", err)
  288. }
  289. defer resp.Body.Close()
  290. body, err := ioutil.ReadAll(resp.Body)
  291. if err != nil {
  292. return ProfileResponse{}, fmt.Errorf("failed to read response body: %v", err)
  293. }
  294. if resp.StatusCode != 200 {
  295. return ProfileResponse{}, fmt.Errorf("failed to query API: status: %s, response body: %s", resp.Status, string(body))
  296. }
  297. var pr ProfileResponse
  298. if err := json.Unmarshal(body, &pr); err != nil {
  299. return ProfileResponse{}, err
  300. }
  301. return pr, nil
  302. }
  303. // createAndPublishDockerImage creates a docker image from source code in a GCS
  304. // bucket and pushes the image to Google Container Registry.
  305. func (tr *GKETestRunner) createAndPublishDockerImage(ctx context.Context, projectID, sourceBucket, sourceObject, ImageName string) error {
  306. cloudbuildService, err := cloudbuild.New(tr.Client)
  307. if err != nil {
  308. return err
  309. }
  310. build := &cloudbuild.Build{
  311. Source: &cloudbuild.Source{
  312. StorageSource: &cloudbuild.StorageSource{
  313. Bucket: sourceBucket,
  314. Object: sourceObject,
  315. },
  316. },
  317. Steps: []*cloudbuild.BuildStep{
  318. {
  319. Name: "gcr.io/cloud-builders/docker",
  320. Args: []string{"build", "-t", ImageName, "."},
  321. },
  322. },
  323. Images: []string{ImageName},
  324. }
  325. op, err := cloudbuildService.Projects.Builds.Create(projectID, build).Context(ctx).Do()
  326. if err != nil {
  327. return fmt.Errorf("failed to create image: %v", err)
  328. }
  329. opID := op.Name
  330. // Wait for creating image.
  331. for {
  332. select {
  333. case <-ctx.Done():
  334. return fmt.Errorf("timed out waiting creating image")
  335. case <-time.After(10 * time.Second):
  336. op, err := cloudbuildService.Operations.Get(opID).Context(ctx).Do()
  337. if err != nil {
  338. log.Printf("Transient error getting operation (will retry): %v", err)
  339. break
  340. }
  341. if op.Done {
  342. log.Printf("Published image %s to Google Container Registry.", ImageName)
  343. return nil
  344. }
  345. }
  346. }
  347. }
  348. type imageResponse struct {
  349. Manifest map[string]interface{} `json:"manifest"`
  350. Name string `json:"name"`
  351. Tags []string `json:"tags"`
  352. }
  353. // deleteDockerImage deletes a docker image from Google Container Registry.
  354. func (tr *GKETestRunner) deleteDockerImage(ctx context.Context, ImageName string) []error {
  355. queryImageURL := fmt.Sprintf("https://gcr.io/v2/%s/tags/list", ImageName)
  356. resp, err := tr.Client.Get(queryImageURL)
  357. if err != nil {
  358. return []error{fmt.Errorf("failed to list tags: %v", err)}
  359. }
  360. defer resp.Body.Close()
  361. body, err := ioutil.ReadAll(resp.Body)
  362. if err != nil {
  363. return []error{err}
  364. }
  365. var ir imageResponse
  366. if err := json.Unmarshal(body, &ir); err != nil {
  367. return []error{err}
  368. }
  369. const deleteImageURLFmt = "https://gcr.io/v2/%s/manifests/%s"
  370. var errs []error
  371. for _, tag := range ir.Tags {
  372. if err := deleteDockerImageResource(tr.Client, fmt.Sprintf(deleteImageURLFmt, ImageName, tag)); err != nil {
  373. errs = append(errs, fmt.Errorf("failed to delete tag %s: %v", tag, err))
  374. }
  375. }
  376. for manifest := range ir.Manifest {
  377. if err := deleteDockerImageResource(tr.Client, fmt.Sprintf(deleteImageURLFmt, ImageName, manifest)); err != nil {
  378. errs = append(errs, fmt.Errorf("failed to delete manifest %s: %v", manifest, err))
  379. }
  380. }
  381. return errs
  382. }
  383. func deleteDockerImageResource(client *http.Client, url string) error {
  384. req, err := http.NewRequest("DELETE", url, nil)
  385. if err != nil {
  386. return fmt.Errorf("failed to get request: %v", err)
  387. }
  388. resp, err := client.Do(req)
  389. if err != nil {
  390. return fmt.Errorf("failed to delete resource: %v", err)
  391. }
  392. defer resp.Body.Close()
  393. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
  394. return fmt.Errorf("failed to delete resource: status code = %d", resp.StatusCode)
  395. }
  396. return nil
  397. }
  398. func (tr *GKETestRunner) deployContainer(ctx context.Context, kubernetesClient *kubernetes.Client, podName, ImageName string) error {
  399. // TODO: Pod restart policy defaults to "Always". Previous logs will disappear
  400. // after restarting. Always restart causes the test not be able to see the
  401. // finish signal. Should probably set the restart policy to "OnFailure" when
  402. // we get the GKE workflow working and testable.
  403. pod := &k8sapi.Pod{
  404. ObjectMeta: k8sapi.ObjectMeta{
  405. Name: podName,
  406. },
  407. Spec: k8sapi.PodSpec{
  408. Containers: []k8sapi.Container{
  409. {
  410. Name: "profiler-test",
  411. Image: fmt.Sprintf("gcr.io/%s:latest", ImageName),
  412. },
  413. },
  414. },
  415. }
  416. if _, err := kubernetesClient.RunLongLivedPod(ctx, pod); err != nil {
  417. return fmt.Errorf("failed to run pod %s: %v", podName, err)
  418. }
  419. return nil
  420. }
  421. // PollPodLog polls the log of the kubernetes client and returns when the
  422. // finishString appears in the log, or when the context times out.
  423. func (tr *GKETestRunner) PollPodLog(ctx context.Context, kubernetesClient *kubernetes.Client, podName, finishString string) error {
  424. var output string
  425. defer func() {
  426. log.Printf("Log for pod %s:\n%s", podName, output)
  427. }()
  428. for {
  429. select {
  430. case <-ctx.Done():
  431. return fmt.Errorf("timed out waiting profiling finishing on container")
  432. case <-time.After(20 * time.Second):
  433. var err error
  434. output, err = kubernetesClient.PodLog(ctx, podName)
  435. if err != nil {
  436. // Transient failure.
  437. log.Printf("Transient error getting log (will retry): %v", err)
  438. continue
  439. }
  440. if strings.Contains(output, finishString) {
  441. return nil
  442. }
  443. }
  444. }
  445. }
  446. // DeleteClusterAndImage deletes cluster and images used to create cluster.
  447. func (tr *GKETestRunner) DeleteClusterAndImage(ctx context.Context, cfg *ClusterConfig) []error {
  448. var errs []error
  449. if err := tr.StorageClient.Bucket(cfg.Bucket).Object(cfg.ImageSourceName).Delete(ctx); err != nil {
  450. errs = append(errs, fmt.Errorf("failed to delete storage client: %v", err))
  451. }
  452. for _, err := range tr.deleteDockerImage(ctx, cfg.ImageName) {
  453. errs = append(errs, fmt.Errorf("failed to delete docker image: %v", err))
  454. }
  455. if _, err := tr.ContainerService.Projects.Zones.Clusters.Delete(cfg.ProjectID, cfg.Zone, cfg.ClusterName).Context(ctx).Do(); err != nil {
  456. errs = append(errs, fmt.Errorf("failed to delete cluster %s: %v", cfg.ClusterName, err))
  457. }
  458. return errs
  459. }
  460. // StartAndDeployCluster creates image needed for cluster, then starts and
  461. // deploys to cluster.
  462. func (tr *GKETestRunner) StartAndDeployCluster(ctx context.Context, cfg *ClusterConfig) (*kubernetes.Client, error) {
  463. if err := tr.uploadImageSource(ctx, cfg.Bucket, cfg.ImageSourceName, cfg.Dockerfile); err != nil {
  464. return nil, fmt.Errorf("failed to upload image source: %v", err)
  465. }
  466. createImageCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
  467. defer cancel()
  468. if err := tr.createAndPublishDockerImage(createImageCtx, cfg.ProjectID, cfg.Bucket, cfg.ImageSourceName, fmt.Sprintf("gcr.io/%s", cfg.ImageName)); err != nil {
  469. return nil, fmt.Errorf("failed to create and publish docker image %s: %v", cfg.ImageName, err)
  470. }
  471. kubernetesClient, err := gke.NewClient(ctx, cfg.ClusterName, gke.OptZone(cfg.Zone), gke.OptProject(cfg.ProjectID))
  472. if err != nil {
  473. return nil, fmt.Errorf("failed to create new GKE client: %v", err)
  474. }
  475. deployContainerCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
  476. defer cancel()
  477. if err := tr.deployContainer(deployContainerCtx, kubernetesClient, cfg.PodName, cfg.ImageName); err != nil {
  478. return nil, fmt.Errorf("failed to deploy image %q to pod %q: %v", cfg.PodName, cfg.ImageName, err)
  479. }
  480. return kubernetesClient, nil
  481. }
  482. // uploadImageSource uploads source code for building docker image to GCS.
  483. func (tr *GKETestRunner) uploadImageSource(ctx context.Context, bucket, objectName, dockerfile string) error {
  484. zipBuf := new(bytes.Buffer)
  485. z := zip.NewWriter(zipBuf)
  486. f, err := z.Create("Dockerfile")
  487. if err != nil {
  488. return err
  489. }
  490. if _, err := f.Write([]byte(dockerfile)); err != nil {
  491. return err
  492. }
  493. if err := z.Close(); err != nil {
  494. return err
  495. }
  496. wc := tr.StorageClient.Bucket(bucket).Object(objectName).NewWriter(ctx)
  497. wc.ContentType = "application/zip"
  498. wc.ACL = []storage.ACLRule{{Entity: storage.AllUsers, Role: storage.RoleReader}}
  499. if _, err := wc.Write(zipBuf.Bytes()); err != nil {
  500. return err
  501. }
  502. return wc.Close()
  503. }