You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

352 lines
8.4 KiB

  1. package dynamodb
  2. import simplejson "github.com/bitly/go-simplejson"
  3. import (
  4. "errors"
  5. "fmt"
  6. "log"
  7. )
  8. type BatchGetItem struct {
  9. Server *Server
  10. Keys map[*Table][]Key
  11. }
  12. type BatchWriteItem struct {
  13. Server *Server
  14. ItemActions map[*Table]map[string][][]Attribute
  15. }
  16. func (t *Table) BatchGetItems(keys []Key) *BatchGetItem {
  17. batchGetItem := &BatchGetItem{t.Server, make(map[*Table][]Key)}
  18. batchGetItem.Keys[t] = keys
  19. return batchGetItem
  20. }
  21. func (t *Table) BatchWriteItems(itemActions map[string][][]Attribute) *BatchWriteItem {
  22. batchWriteItem := &BatchWriteItem{t.Server, make(map[*Table]map[string][][]Attribute)}
  23. batchWriteItem.ItemActions[t] = itemActions
  24. return batchWriteItem
  25. }
  26. func (batchGetItem *BatchGetItem) AddTable(t *Table, keys *[]Key) *BatchGetItem {
  27. batchGetItem.Keys[t] = *keys
  28. return batchGetItem
  29. }
  30. func (batchWriteItem *BatchWriteItem) AddTable(t *Table, itemActions *map[string][][]Attribute) *BatchWriteItem {
  31. batchWriteItem.ItemActions[t] = *itemActions
  32. return batchWriteItem
  33. }
  34. func (batchGetItem *BatchGetItem) Execute() (map[string][]map[string]*Attribute, error) {
  35. q := NewEmptyQuery()
  36. q.AddGetRequestItems(batchGetItem.Keys)
  37. jsonResponse, err := batchGetItem.Server.queryServer("DynamoDB_20120810.BatchGetItem", q)
  38. if err != nil {
  39. return nil, err
  40. }
  41. json, err := simplejson.NewJson(jsonResponse)
  42. if err != nil {
  43. return nil, err
  44. }
  45. results := make(map[string][]map[string]*Attribute)
  46. tables, err := json.Get("Responses").Map()
  47. if err != nil {
  48. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  49. return nil, errors.New(message)
  50. }
  51. for table, entries := range tables {
  52. var tableResult []map[string]*Attribute
  53. jsonEntriesArray, ok := entries.([]interface{})
  54. if !ok {
  55. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  56. return nil, errors.New(message)
  57. }
  58. for _, entry := range jsonEntriesArray {
  59. item, ok := entry.(map[string]interface{})
  60. if !ok {
  61. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  62. return nil, errors.New(message)
  63. }
  64. unmarshalledItem := parseAttributes(item)
  65. tableResult = append(tableResult, unmarshalledItem)
  66. }
  67. results[table] = tableResult
  68. }
  69. return results, nil
  70. }
  71. func (batchWriteItem *BatchWriteItem) Execute() (map[string]interface{}, error) {
  72. q := NewEmptyQuery()
  73. q.AddWriteRequestItems(batchWriteItem.ItemActions)
  74. jsonResponse, err := batchWriteItem.Server.queryServer("DynamoDB_20120810.BatchWriteItem", q)
  75. if err != nil {
  76. return nil, err
  77. }
  78. json, err := simplejson.NewJson(jsonResponse)
  79. if err != nil {
  80. return nil, err
  81. }
  82. unprocessed, err := json.Get("UnprocessedItems").Map()
  83. if err != nil {
  84. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  85. return nil, errors.New(message)
  86. }
  87. if len(unprocessed) == 0 {
  88. return nil, nil
  89. } else {
  90. return unprocessed, errors.New("One or more unprocessed items.")
  91. }
  92. }
  93. func (t *Table) GetItem(key *Key) (map[string]*Attribute, error) {
  94. return t.getItem(key, false)
  95. }
  96. func (t *Table) GetItemConsistent(key *Key, consistentRead bool) (map[string]*Attribute, error) {
  97. return t.getItem(key, consistentRead)
  98. }
  99. func (t *Table) getItem(key *Key, consistentRead bool) (map[string]*Attribute, error) {
  100. q := NewQuery(t)
  101. q.AddKey(t, key)
  102. if consistentRead {
  103. q.ConsistentRead(consistentRead)
  104. }
  105. jsonResponse, err := t.Server.queryServer(target("GetItem"), q)
  106. if err != nil {
  107. return nil, err
  108. }
  109. json, err := simplejson.NewJson(jsonResponse)
  110. if err != nil {
  111. return nil, err
  112. }
  113. itemJson, ok := json.CheckGet("Item")
  114. if !ok {
  115. // We got an empty from amz. The item doesn't exist.
  116. return nil, ErrNotFound
  117. }
  118. item, err := itemJson.Map()
  119. if err != nil {
  120. message := fmt.Sprintf("Unexpected response %s", jsonResponse)
  121. return nil, errors.New(message)
  122. }
  123. return parseAttributes(item), nil
  124. }
  125. func (t *Table) PutItem(hashKey string, rangeKey string, attributes []Attribute) (bool, error) {
  126. return t.putItem(hashKey, rangeKey, attributes, nil)
  127. }
  128. func (t *Table) ConditionalPutItem(hashKey, rangeKey string, attributes, expected []Attribute) (bool, error) {
  129. return t.putItem(hashKey, rangeKey, attributes, expected)
  130. }
  131. func (t *Table) putItem(hashKey, rangeKey string, attributes, expected []Attribute) (bool, error) {
  132. if len(attributes) == 0 {
  133. return false, errors.New("At least one attribute is required.")
  134. }
  135. q := NewQuery(t)
  136. keys := t.Key.Clone(hashKey, rangeKey)
  137. attributes = append(attributes, keys...)
  138. q.AddItem(attributes)
  139. if expected != nil {
  140. q.AddExpected(expected)
  141. }
  142. jsonResponse, err := t.Server.queryServer(target("PutItem"), q)
  143. if err != nil {
  144. return false, err
  145. }
  146. _, err = simplejson.NewJson(jsonResponse)
  147. if err != nil {
  148. return false, err
  149. }
  150. return true, nil
  151. }
  152. func (t *Table) deleteItem(key *Key, expected []Attribute) (bool, error) {
  153. q := NewQuery(t)
  154. q.AddKey(t, key)
  155. if expected != nil {
  156. q.AddExpected(expected)
  157. }
  158. jsonResponse, err := t.Server.queryServer(target("DeleteItem"), q)
  159. if err != nil {
  160. return false, err
  161. }
  162. _, err = simplejson.NewJson(jsonResponse)
  163. if err != nil {
  164. return false, err
  165. }
  166. return true, nil
  167. }
  168. func (t *Table) DeleteItem(key *Key) (bool, error) {
  169. return t.deleteItem(key, nil)
  170. }
  171. func (t *Table) ConditionalDeleteItem(key *Key, expected []Attribute) (bool, error) {
  172. return t.deleteItem(key, expected)
  173. }
  174. func (t *Table) AddAttributes(key *Key, attributes []Attribute) (bool, error) {
  175. return t.modifyAttributes(key, attributes, nil, "ADD")
  176. }
  177. func (t *Table) UpdateAttributes(key *Key, attributes []Attribute) (bool, error) {
  178. return t.modifyAttributes(key, attributes, nil, "PUT")
  179. }
  180. func (t *Table) DeleteAttributes(key *Key, attributes []Attribute) (bool, error) {
  181. return t.modifyAttributes(key, attributes, nil, "DELETE")
  182. }
  183. func (t *Table) ConditionalAddAttributes(key *Key, attributes, expected []Attribute) (bool, error) {
  184. return t.modifyAttributes(key, attributes, expected, "ADD")
  185. }
  186. func (t *Table) ConditionalUpdateAttributes(key *Key, attributes, expected []Attribute) (bool, error) {
  187. return t.modifyAttributes(key, attributes, expected, "PUT")
  188. }
  189. func (t *Table) ConditionalDeleteAttributes(key *Key, attributes, expected []Attribute) (bool, error) {
  190. return t.modifyAttributes(key, attributes, expected, "DELETE")
  191. }
  192. func (t *Table) modifyAttributes(key *Key, attributes, expected []Attribute, action string) (bool, error) {
  193. if len(attributes) == 0 {
  194. return false, errors.New("At least one attribute is required.")
  195. }
  196. q := NewQuery(t)
  197. q.AddKey(t, key)
  198. q.AddUpdates(attributes, action)
  199. if expected != nil {
  200. q.AddExpected(expected)
  201. }
  202. jsonResponse, err := t.Server.queryServer(target("UpdateItem"), q)
  203. if err != nil {
  204. return false, err
  205. }
  206. _, err = simplejson.NewJson(jsonResponse)
  207. if err != nil {
  208. return false, err
  209. }
  210. return true, nil
  211. }
  212. func parseAttributes(s map[string]interface{}) map[string]*Attribute {
  213. results := map[string]*Attribute{}
  214. for key, value := range s {
  215. if v, ok := value.(map[string]interface{}); ok {
  216. if val, ok := v[TYPE_STRING].(string); ok {
  217. results[key] = &Attribute{
  218. Type: TYPE_STRING,
  219. Name: key,
  220. Value: val,
  221. }
  222. } else if val, ok := v[TYPE_NUMBER].(string); ok {
  223. results[key] = &Attribute{
  224. Type: TYPE_NUMBER,
  225. Name: key,
  226. Value: val,
  227. }
  228. } else if val, ok := v[TYPE_BINARY].(string); ok {
  229. results[key] = &Attribute{
  230. Type: TYPE_BINARY,
  231. Name: key,
  232. Value: val,
  233. }
  234. } else if vals, ok := v[TYPE_STRING_SET].([]interface{}); ok {
  235. arry := make([]string, len(vals))
  236. for i, ivalue := range vals {
  237. if val, ok := ivalue.(string); ok {
  238. arry[i] = val
  239. }
  240. }
  241. results[key] = &Attribute{
  242. Type: TYPE_STRING_SET,
  243. Name: key,
  244. SetValues: arry,
  245. }
  246. } else if vals, ok := v[TYPE_NUMBER_SET].([]interface{}); ok {
  247. arry := make([]string, len(vals))
  248. for i, ivalue := range vals {
  249. if val, ok := ivalue.(string); ok {
  250. arry[i] = val
  251. }
  252. }
  253. results[key] = &Attribute{
  254. Type: TYPE_NUMBER_SET,
  255. Name: key,
  256. SetValues: arry,
  257. }
  258. } else if vals, ok := v[TYPE_BINARY_SET].([]interface{}); ok {
  259. arry := make([]string, len(vals))
  260. for i, ivalue := range vals {
  261. if val, ok := ivalue.(string); ok {
  262. arry[i] = val
  263. }
  264. }
  265. results[key] = &Attribute{
  266. Type: TYPE_BINARY_SET,
  267. Name: key,
  268. SetValues: arry,
  269. }
  270. }
  271. } else {
  272. log.Printf("type assertion to map[string] interface{} failed for : %s\n ", value)
  273. }
  274. }
  275. return results
  276. }