25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 

308 satır
7.2 KiB

  1. package dynamodb
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. simplejson "github.com/bitly/go-simplejson"
  8. )
  9. type Stream struct {
  10. Server *Server
  11. Arn string
  12. }
  13. type StreamListItemT struct {
  14. StreamArn string
  15. StreamLabel string
  16. TableName string
  17. }
  18. type SequenceNumberRangeT struct {
  19. EndingSequenceNumber string
  20. StartingSequenceNumber string
  21. }
  22. type ShardT struct {
  23. ParentShardId string
  24. SequenceNumberRange SequenceNumberRangeT
  25. ShardId string
  26. }
  27. type StreamDescriptionT struct {
  28. CreationDateTime float64
  29. KeySchema []KeySchemaT
  30. LastEvaluatedShardId string
  31. Shards []ShardT
  32. StreamArn string
  33. StreamLabel string
  34. StreamStatus string
  35. StreamViewType string
  36. TableName string
  37. }
  38. type RecordT struct {
  39. AwsRegion string
  40. EventID string
  41. EventName string
  42. EventSource string
  43. EventVersion string
  44. StreamRecord *StreamRecordT
  45. }
  46. type StreamRecordT struct {
  47. Keys map[string]*Attribute
  48. NewImage map[string]*Attribute
  49. OldImage map[string]*Attribute
  50. SequenceNumber string
  51. StreamViewType string
  52. SizeBytes int64
  53. }
  54. type listStreamsResponse struct {
  55. Streams []StreamListItemT
  56. }
  57. type describeStreamResponse struct {
  58. StreamDescription StreamDescriptionT
  59. }
  60. var ErrNoRecords = errors.New("No records")
  61. func (s *Server) ListStreams(startArn string) ([]StreamListItemT, error) {
  62. return s.LimitedListTableStreams("", startArn, 0)
  63. }
  64. func (s *Server) LimitedListStreams(startArn string, limit int64) ([]StreamListItemT, error) {
  65. return s.LimitedListTableStreams("", startArn, limit)
  66. }
  67. func (s *Server) ListTableStreams(table, startArn string) ([]StreamListItemT, error) {
  68. return s.LimitedListTableStreams(table, startArn, 0)
  69. }
  70. func (s *Server) LimitedListTableStreams(table, startArn string, limit int64) ([]StreamListItemT, error) {
  71. query := NewEmptyQuery()
  72. if len(table) != 0 {
  73. query.addTableByName(table)
  74. }
  75. if len(startArn) != 0 {
  76. query.AddExclusiveStartStreamArn(startArn)
  77. }
  78. if limit > 0 {
  79. query.AddLimit(limit)
  80. }
  81. jsonResponse, err := s.queryServer(streamsTarget("ListStreams"), query)
  82. if err != nil {
  83. return nil, err
  84. }
  85. var r listStreamsResponse
  86. err = json.Unmarshal(jsonResponse, &r)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return r.Streams, nil
  91. }
  92. func (s *Server) DescribeStream(arn, startShardId string) (*StreamDescriptionT, error) {
  93. return s.LimitedDescribeStream(arn, startShardId, 0)
  94. }
  95. func (s *Server) LimitedDescribeStream(arn, startShardId string, limit int64) (*StreamDescriptionT, error) {
  96. query := NewEmptyQuery()
  97. query.AddStreamArn(arn)
  98. if len(startShardId) != 0 {
  99. query.AddExclusiveStartShardId(startShardId)
  100. }
  101. if limit > 0 {
  102. query.AddLimit(limit)
  103. }
  104. jsonResponse, err := s.queryServer(streamsTarget("DescribeStream"), query)
  105. if err != nil {
  106. return nil, err
  107. }
  108. var r describeStreamResponse
  109. err = json.Unmarshal(jsonResponse, &r)
  110. if err != nil {
  111. return nil, err
  112. }
  113. return &r.StreamDescription, nil
  114. }
  115. func (s *Server) NewStream(streamArn string) *Stream {
  116. return &Stream{s, streamArn}
  117. }
  118. func (s *Stream) DescribeStream(startShardId string) (*StreamDescriptionT, error) {
  119. return s.Server.DescribeStream(s.Arn, startShardId)
  120. }
  121. func (s *Stream) LimitedDescribeStream(startShardId string, limit int64) (*StreamDescriptionT, error) {
  122. return s.Server.LimitedDescribeStream(s.Arn, startShardId, limit)
  123. }
  124. func (s *Server) GetShardIterator(streamArn, shardId, shardIteratorType, sequenceNumber string) (string, error) {
  125. query := NewEmptyQuery()
  126. query.AddStreamArn(streamArn)
  127. query.AddShardId(shardId)
  128. query.AddShardIteratorType(shardIteratorType)
  129. if len(sequenceNumber) != 0 {
  130. query.AddSequenceNumber(sequenceNumber)
  131. }
  132. jsonResponse, err := s.queryServer(streamsTarget("GetShardIterator"), query)
  133. if err != nil {
  134. return "unknown", err
  135. }
  136. json, err := simplejson.NewJson(jsonResponse)
  137. if err != nil {
  138. return "unknown", err
  139. }
  140. return json.Get("ShardIterator").MustString(), nil
  141. }
  142. func (s *Stream) GetShardIterator(shardId, shardIteratorType, sequenceNumber string) (string, error) {
  143. return s.Server.GetShardIterator(s.Arn, shardId, shardIteratorType, sequenceNumber)
  144. }
  145. func (s *Server) GetRecords(shardIterator string) (string, []*RecordT, error) {
  146. return s.LimitedGetRecords(shardIterator, 0)
  147. }
  148. func (s *Server) LimitedGetRecords(shardIterator string, limit int64) (string, []*RecordT, error) {
  149. query := NewEmptyQuery()
  150. query.AddShardIterator(shardIterator)
  151. if limit > 0 {
  152. query.AddLimit(limit)
  153. }
  154. jsonResponse, err := s.queryServer(streamsTarget("GetRecords"), query)
  155. if err != nil {
  156. return "", nil, err
  157. }
  158. jsonParsed, err := simplejson.NewJson(jsonResponse)
  159. if err != nil {
  160. return "", nil, err
  161. }
  162. nextShardIt := ""
  163. nextShardItJson, ok := jsonParsed.CheckGet("NextShardIterator")
  164. if ok {
  165. nextShardIt, err = nextShardItJson.String()
  166. if err != nil {
  167. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  168. return "", nil, errors.New(message)
  169. }
  170. }
  171. recordsJson, ok := jsonParsed.CheckGet("Records")
  172. if !ok {
  173. return nextShardIt, nil, ErrNoRecords
  174. }
  175. recordsArray, err := recordsJson.Array()
  176. if err != nil {
  177. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  178. return nextShardIt, nil, errors.New(message)
  179. }
  180. var records []*RecordT
  181. for _, record := range recordsArray {
  182. if recordMap, ok := record.(map[string]interface{}); ok {
  183. r := parseRecord(recordMap)
  184. records = append(records, r)
  185. }
  186. }
  187. return nextShardIt, records, nil
  188. }
  189. func (s *Stream) GetRecords(shardIterator string) (string, []*RecordT, error) {
  190. return s.Server.GetRecords(shardIterator)
  191. }
  192. func (s *Stream) LimitedGetRecords(shardIterator string, limit int64) (string, []*RecordT, error) {
  193. return s.Server.LimitedGetRecords(shardIterator, limit)
  194. }
  195. func parseRecord(r map[string]interface{}) *RecordT {
  196. record := RecordT{}
  197. rValue := reflect.ValueOf(&record)
  198. keys := []string{"awsRegion", "eventID", "eventName", "eventSource", "eventVersion"}
  199. for i, key := range keys {
  200. if value, ok := r[key]; ok {
  201. if valueStr, ok := value.(string); ok {
  202. rValue.Elem().Field(i).SetString(valueStr)
  203. }
  204. }
  205. }
  206. if streamRecord, ok := r["dynamodb"]; ok {
  207. if streamRecordMap, ok := streamRecord.(map[string]interface{}); ok {
  208. record.StreamRecord = parseStreamRecord(streamRecordMap)
  209. }
  210. }
  211. return &record
  212. }
  213. func parseStreamRecord(s map[string]interface{}) *StreamRecordT {
  214. sr := StreamRecordT{}
  215. rValue := reflect.ValueOf(&sr)
  216. attrKeys := []string{"Keys", "NewImage", "OldImage"}
  217. numAttrKeys := len(attrKeys)
  218. for i, key := range attrKeys {
  219. if value, ok := s[key]; ok {
  220. if valueMap, ok := value.(map[string]interface{}); ok {
  221. attrs := parseAttributes(valueMap)
  222. rValue.Elem().Field(i).Set(reflect.ValueOf(attrs))
  223. }
  224. }
  225. }
  226. strKeys := []string{"SequenceNumber", "StreamViewType"}
  227. numStrKeys := len(strKeys)
  228. for i, key := range strKeys {
  229. if value, ok := s[key]; ok {
  230. if valueStr, ok := value.(string); ok {
  231. rValue.Elem().Field(i + numAttrKeys).SetString(valueStr)
  232. }
  233. }
  234. }
  235. intKeys := []string{"SizeBytes"}
  236. for i, key := range intKeys {
  237. if value, ok := s[key]; ok {
  238. if valueNumber, ok := value.(json.Number); ok {
  239. if valueInt, err := valueNumber.Int64(); err == nil {
  240. rValue.Elem().Field(i + numAttrKeys + numStrKeys).SetInt(valueInt)
  241. }
  242. }
  243. }
  244. }
  245. return &sr
  246. }