Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 

334 righe
9.7 KiB

  1. // Copyright 2017 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "encoding/base64"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "net"
  22. "net/http"
  23. "net/textproto"
  24. "os"
  25. "strings"
  26. pubsub "google.golang.org/api/pubsub/v1beta2"
  27. )
  28. const USAGE = `Available arguments are:
  29. <project_id> list_topics
  30. <project_id> create_topic <topic>
  31. <project_id> delete_topic <topic>
  32. <project_id> list_subscriptions
  33. <project_id> create_subscription <subscription> <linked topic>
  34. <project_id> delete_subscription <subscription>
  35. <project_id> connect_irc <topic> <server> <channel>
  36. <project_id> pull_messages <subscription>
  37. `
  38. type IRCBot struct {
  39. server string
  40. port string
  41. nick string
  42. user string
  43. channel string
  44. conn net.Conn
  45. tpReader *textproto.Reader
  46. }
  47. func NewIRCBot(server, channel, nick string) *IRCBot {
  48. return &IRCBot{
  49. server: server,
  50. port: "6667",
  51. nick: nick,
  52. channel: channel,
  53. conn: nil,
  54. user: nick,
  55. }
  56. }
  57. func (bot *IRCBot) Connect() {
  58. conn, err := net.Dial("tcp", bot.server+":"+bot.port)
  59. if err != nil {
  60. log.Fatal("unable to connect to IRC server ", err)
  61. }
  62. bot.conn = conn
  63. log.Printf("Connected to IRC server %s (%s)\n",
  64. bot.server, bot.conn.RemoteAddr())
  65. bot.tpReader = textproto.NewReader(bufio.NewReader(bot.conn))
  66. bot.Sendf("USER %s 8 * :%s\r\n", bot.nick, bot.nick)
  67. bot.Sendf("NICK %s\r\n", bot.nick)
  68. bot.Sendf("JOIN %s\r\n", bot.channel)
  69. }
  70. func (bot *IRCBot) CheckConnection() {
  71. for {
  72. line, err := bot.ReadLine()
  73. if err != nil {
  74. log.Fatal("Unable to read a line during checking the connection.")
  75. }
  76. if parts := strings.Split(line, " "); len(parts) > 1 {
  77. if parts[1] == "004" {
  78. log.Println("The nick accepted.")
  79. } else if parts[1] == "433" {
  80. log.Fatalf("The nick is already in use: %s", line)
  81. } else if parts[1] == "366" {
  82. log.Println("Starting to publish messages.")
  83. return
  84. }
  85. }
  86. }
  87. }
  88. func (bot *IRCBot) Sendf(format string, args ...interface{}) {
  89. fmt.Fprintf(bot.conn, format, args...)
  90. }
  91. func (bot *IRCBot) Close() {
  92. bot.conn.Close()
  93. }
  94. func (bot *IRCBot) ReadLine() (line string, err error) {
  95. return bot.tpReader.ReadLine()
  96. }
  97. func init() {
  98. registerDemo("pubsub", pubsub.PubsubScope, pubsubMain)
  99. }
  100. func pubsubUsage() {
  101. fmt.Fprint(os.Stderr, USAGE)
  102. }
  103. // Returns a fully qualified resource name for Cloud Pub/Sub.
  104. func fqrn(res, proj, name string) string {
  105. return fmt.Sprintf("projects/%s/%s/%s", proj, res, name)
  106. }
  107. func fullTopicName(proj, topic string) string {
  108. return fqrn("topics", proj, topic)
  109. }
  110. func fullSubName(proj, topic string) string {
  111. return fqrn("subscriptions", proj, topic)
  112. }
  113. // Check the length of the arguments.
  114. func checkArgs(argv []string, min int) {
  115. if len(argv) < min {
  116. pubsubUsage()
  117. os.Exit(2)
  118. }
  119. }
  120. func listTopics(service *pubsub.Service, argv []string) {
  121. next := ""
  122. for {
  123. topicsList, err := service.Projects.Topics.List(fmt.Sprintf("projects/%s", argv[0])).PageToken(next).Do()
  124. if err != nil {
  125. log.Fatalf("listTopics query.Do() failed: %v", err)
  126. }
  127. for _, topic := range topicsList.Topics {
  128. fmt.Println(topic.Name)
  129. }
  130. next = topicsList.NextPageToken
  131. if next == "" {
  132. break
  133. }
  134. }
  135. }
  136. func createTopic(service *pubsub.Service, argv []string) {
  137. checkArgs(argv, 3)
  138. topic, err := service.Projects.Topics.Create(fullTopicName(argv[0], argv[2]), &pubsub.Topic{}).Do()
  139. if err != nil {
  140. log.Fatalf("createTopic Create().Do() failed: %v", err)
  141. }
  142. fmt.Printf("Topic %s was created.\n", topic.Name)
  143. }
  144. func deleteTopic(service *pubsub.Service, argv []string) {
  145. checkArgs(argv, 3)
  146. topicName := fullTopicName(argv[0], argv[2])
  147. if _, err := service.Projects.Topics.Delete(topicName).Do(); err != nil {
  148. log.Fatalf("deleteTopic Delete().Do() failed: %v", err)
  149. }
  150. fmt.Printf("Topic %s was deleted.\n", topicName)
  151. }
  152. func listSubscriptions(service *pubsub.Service, argv []string) {
  153. next := ""
  154. for {
  155. subscriptionsList, err := service.Projects.Subscriptions.List(fmt.Sprintf("projects/%s", argv[0])).PageToken(next).Do()
  156. if err != nil {
  157. log.Fatalf("listSubscriptions query.Do() failed: %v", err)
  158. }
  159. for _, subscription := range subscriptionsList.Subscriptions {
  160. sub_text, _ := json.MarshalIndent(subscription, "", " ")
  161. fmt.Printf("%s\n", sub_text)
  162. }
  163. next = subscriptionsList.NextPageToken
  164. if next == "" {
  165. break
  166. }
  167. }
  168. }
  169. func createSubscription(service *pubsub.Service, argv []string) {
  170. checkArgs(argv, 4)
  171. name := fullSubName(argv[0], argv[2])
  172. sub := &pubsub.Subscription{Topic: fullTopicName(argv[0], argv[3])}
  173. subscription, err := service.Projects.Subscriptions.Create(name, sub).Do()
  174. if err != nil {
  175. log.Fatalf("createSubscription Create().Do() failed: %v", err)
  176. }
  177. fmt.Printf("Subscription %s was created.\n", subscription.Name)
  178. }
  179. func deleteSubscription(service *pubsub.Service, argv []string) {
  180. checkArgs(argv, 3)
  181. name := fullSubName(argv[0], argv[2])
  182. if _, err := service.Projects.Subscriptions.Delete(name).Do(); err != nil {
  183. log.Fatalf("deleteSubscription Delete().Do() failed: %v", err)
  184. }
  185. fmt.Printf("Subscription %s was deleted.\n", name)
  186. }
  187. func connectIRC(service *pubsub.Service, argv []string) {
  188. checkArgs(argv, 5)
  189. topicName := fullTopicName(argv[0], argv[2])
  190. server := argv[3]
  191. channel := argv[4]
  192. nick := fmt.Sprintf("bot-%s", argv[2])
  193. ircbot := NewIRCBot(server, channel, nick)
  194. ircbot.Connect()
  195. defer ircbot.Close()
  196. ircbot.CheckConnection()
  197. privMark := fmt.Sprintf("PRIVMSG %s :", ircbot.channel)
  198. for {
  199. line, err := ircbot.ReadLine()
  200. if err != nil {
  201. log.Fatal("Unable to read a line from the connection.")
  202. }
  203. parts := strings.Split(line, " ")
  204. if len(parts) > 0 && parts[0] == "PING" {
  205. ircbot.Sendf("PONG %s\r\n", parts[1])
  206. } else {
  207. pos := strings.Index(line, privMark)
  208. if pos == -1 {
  209. continue
  210. }
  211. privMsg := line[pos+len(privMark) : len(line)]
  212. pubsubMessage := &pubsub.PubsubMessage{
  213. Data: base64.StdEncoding.EncodeToString([]byte(privMsg)),
  214. }
  215. publishRequest := &pubsub.PublishRequest{
  216. Messages: []*pubsub.PubsubMessage{pubsubMessage},
  217. }
  218. if _, err := service.Projects.Topics.Publish(topicName, publishRequest).Do(); err != nil {
  219. log.Fatalf("connectIRC Publish().Do() failed: %v", err)
  220. }
  221. log.Println("Published a message to the topic.")
  222. }
  223. }
  224. }
  225. func pullMessages(service *pubsub.Service, argv []string) {
  226. checkArgs(argv, 3)
  227. subName := fullSubName(argv[0], argv[2])
  228. pullRequest := &pubsub.PullRequest{
  229. ReturnImmediately: false,
  230. MaxMessages: 1,
  231. }
  232. for {
  233. pullResponse, err := service.Projects.Subscriptions.Pull(subName, pullRequest).Do()
  234. if err != nil {
  235. log.Fatalf("pullMessages Pull().Do() failed: %v", err)
  236. }
  237. for _, receivedMessage := range pullResponse.ReceivedMessages {
  238. data, err := base64.StdEncoding.DecodeString(receivedMessage.Message.Data)
  239. if err != nil {
  240. log.Fatalf("pullMessages DecodeString() failed: %v", err)
  241. }
  242. fmt.Printf("%s\n", data)
  243. ackRequest := &pubsub.AcknowledgeRequest{
  244. AckIds: []string{receivedMessage.AckId},
  245. }
  246. if _, err = service.Projects.Subscriptions.Acknowledge(subName, ackRequest).Do(); err != nil {
  247. log.Printf("pullMessages Acknowledge().Do() failed: %v", err)
  248. }
  249. }
  250. }
  251. }
  252. // This example demonstrates calling the Cloud Pub/Sub API. As of 20
  253. // Aug 2014, the Cloud Pub/Sub API is only available if you're
  254. // whitelisted. If you're interested in using it, please apply for the
  255. // Limited Preview program at the following form:
  256. // http://goo.gl/Wql9HL
  257. //
  258. // Also, before running this example, be sure to enable Cloud Pub/Sub
  259. // service on your project in Developer Console at:
  260. // https://console.developers.google.com/
  261. //
  262. // It has 8 subcommands as follows:
  263. //
  264. // <project_id> list_topics
  265. // <project_id> create_topic <topic>
  266. // <project_id> delete_topic <topic>
  267. // <project_id> list_subscriptions
  268. // <project_id> create_subscription <subscription> <linked topic>
  269. // <project_id> delete_subscription <subscription>
  270. // <project_id> connect_irc <topic> <server> <channel>
  271. // <project_id> pull_messages <subscription>
  272. //
  273. // You can use either of your alphanumerical or numerial Cloud Project
  274. // ID for project_id. You can choose any names for topic and
  275. // subscription as long as they follow the naming rule described at:
  276. // https://developers.google.com/pubsub/overview#names
  277. //
  278. // You can list/create/delete topics/subscriptions by self-explanatory
  279. // subcommands, as well as connect to an IRC channel and publish
  280. // messages from the IRC channel to a specified Cloud Pub/Sub topic by
  281. // the "connect_irc" subcommand, or continuously pull messages from a
  282. // specified Cloud Pub/Sub subscription and display the data by the
  283. // "pull_messages" subcommand.
  284. func pubsubMain(client *http.Client, argv []string) {
  285. checkArgs(argv, 2)
  286. service, err := pubsub.New(client)
  287. if err != nil {
  288. log.Fatalf("Unable to create PubSub service: %v", err)
  289. }
  290. m := map[string]func(service *pubsub.Service, argv []string){
  291. "list_topics": listTopics,
  292. "create_topic": createTopic,
  293. "delete_topic": deleteTopic,
  294. "list_subscriptions": listSubscriptions,
  295. "create_subscription": createSubscription,
  296. "delete_subscription": deleteSubscription,
  297. "connect_irc": connectIRC,
  298. "pull_messages": pullMessages,
  299. }
  300. f, ok := m[argv[1]]
  301. if !ok {
  302. pubsubUsage()
  303. os.Exit(2)
  304. }
  305. f(service, argv)
  306. }