You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

713 lines
16 KiB

  1. package diskqueue
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math/rand"
  10. "os"
  11. "path"
  12. "sync"
  13. "time"
  14. )
  15. // logging stuff copied from github.com/nsqio/nsq/internal/lg
  16. type LogLevel int
  17. const (
  18. DEBUG = LogLevel(1)
  19. INFO = LogLevel(2)
  20. WARN = LogLevel(3)
  21. ERROR = LogLevel(4)
  22. FATAL = LogLevel(5)
  23. )
  24. type AppLogFunc func(lvl LogLevel, f string, args ...interface{})
  25. func (l LogLevel) String() string {
  26. switch l {
  27. case 1:
  28. return "DEBUG"
  29. case 2:
  30. return "INFO"
  31. case 3:
  32. return "WARNING"
  33. case 4:
  34. return "ERROR"
  35. case 5:
  36. return "FATAL"
  37. }
  38. panic("invalid LogLevel")
  39. }
  40. type Interface interface {
  41. Put([]byte) error
  42. ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
  43. Close() error
  44. Delete() error
  45. Depth() int64
  46. Empty() error
  47. }
  48. // diskQueue implements a filesystem backed FIFO queue
  49. type diskQueue struct {
  50. // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  51. // run-time state (also persisted to disk)
  52. readPos int64
  53. writePos int64
  54. readFileNum int64
  55. writeFileNum int64
  56. depth int64
  57. sync.RWMutex
  58. // instantiation time metadata
  59. name string
  60. dataPath string
  61. maxBytesPerFile int64 // cannot change once created
  62. maxBytesPerFileRead int64
  63. minMsgSize int32
  64. maxMsgSize int32
  65. syncEvery int64 // number of writes per fsync
  66. syncTimeout time.Duration // duration of time per fsync
  67. exitFlag int32
  68. needSync bool
  69. // keeps track of the position where we have read
  70. // (but not yet sent over readChan)
  71. nextReadPos int64
  72. nextReadFileNum int64
  73. readFile *os.File
  74. writeFile *os.File
  75. reader *bufio.Reader
  76. writeBuf bytes.Buffer
  77. // exposed via ReadChan()
  78. readChan chan []byte
  79. // internal channels
  80. depthChan chan int64
  81. writeChan chan []byte
  82. writeResponseChan chan error
  83. emptyChan chan int
  84. emptyResponseChan chan error
  85. exitChan chan int
  86. exitSyncChan chan int
  87. logf AppLogFunc
  88. }
  89. // New instantiates an instance of diskQueue, retrieving metadata
  90. // from the filesystem and starting the read ahead goroutine
  91. func New(name string, dataPath string, maxBytesPerFile int64,
  92. minMsgSize int32, maxMsgSize int32,
  93. syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
  94. d := diskQueue{
  95. name: name,
  96. dataPath: dataPath,
  97. maxBytesPerFile: maxBytesPerFile,
  98. minMsgSize: minMsgSize,
  99. maxMsgSize: maxMsgSize,
  100. readChan: make(chan []byte),
  101. depthChan: make(chan int64),
  102. writeChan: make(chan []byte),
  103. writeResponseChan: make(chan error),
  104. emptyChan: make(chan int),
  105. emptyResponseChan: make(chan error),
  106. exitChan: make(chan int),
  107. exitSyncChan: make(chan int),
  108. syncEvery: syncEvery,
  109. syncTimeout: syncTimeout,
  110. logf: logf,
  111. }
  112. // no need to lock here, nothing else could possibly be touching this instance
  113. err := d.retrieveMetaData()
  114. if err != nil && !os.IsNotExist(err) {
  115. d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
  116. }
  117. go d.ioLoop()
  118. return &d
  119. }
  120. // Depth returns the depth of the queue
  121. func (d *diskQueue) Depth() int64 {
  122. depth, ok := <-d.depthChan
  123. if !ok {
  124. // ioLoop exited
  125. depth = d.depth
  126. }
  127. return depth
  128. }
  129. // ReadChan returns the receive-only []byte channel for reading data
  130. func (d *diskQueue) ReadChan() <-chan []byte {
  131. return d.readChan
  132. }
  133. // Put writes a []byte to the queue
  134. func (d *diskQueue) Put(data []byte) error {
  135. d.RLock()
  136. defer d.RUnlock()
  137. if d.exitFlag == 1 {
  138. return errors.New("exiting")
  139. }
  140. d.writeChan <- data
  141. return <-d.writeResponseChan
  142. }
  143. // Close cleans up the queue and persists metadata
  144. func (d *diskQueue) Close() error {
  145. err := d.exit(false)
  146. if err != nil {
  147. return err
  148. }
  149. return d.sync()
  150. }
  151. func (d *diskQueue) Delete() error {
  152. return d.exit(true)
  153. }
  154. func (d *diskQueue) exit(deleted bool) error {
  155. d.Lock()
  156. defer d.Unlock()
  157. d.exitFlag = 1
  158. if deleted {
  159. d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
  160. } else {
  161. d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
  162. }
  163. close(d.exitChan)
  164. // ensure that ioLoop has exited
  165. <-d.exitSyncChan
  166. close(d.depthChan)
  167. if d.readFile != nil {
  168. d.readFile.Close()
  169. d.readFile = nil
  170. }
  171. if d.writeFile != nil {
  172. d.writeFile.Close()
  173. d.writeFile = nil
  174. }
  175. return nil
  176. }
  177. // Empty destructively clears out any pending data in the queue
  178. // by fast forwarding read positions and removing intermediate files
  179. func (d *diskQueue) Empty() error {
  180. d.RLock()
  181. defer d.RUnlock()
  182. if d.exitFlag == 1 {
  183. return errors.New("exiting")
  184. }
  185. d.logf(INFO, "DISKQUEUE(%s): emptying", d.name)
  186. d.emptyChan <- 1
  187. return <-d.emptyResponseChan
  188. }
  189. func (d *diskQueue) deleteAllFiles() error {
  190. err := d.skipToNextRWFile()
  191. innerErr := os.Remove(d.metaDataFileName())
  192. if innerErr != nil && !os.IsNotExist(innerErr) {
  193. d.logf(ERROR, "DISKQUEUE(%s) failed to remove metadata file - %s", d.name, innerErr)
  194. return innerErr
  195. }
  196. return err
  197. }
  198. func (d *diskQueue) skipToNextRWFile() error {
  199. var err error
  200. if d.readFile != nil {
  201. d.readFile.Close()
  202. d.readFile = nil
  203. }
  204. if d.writeFile != nil {
  205. d.writeFile.Close()
  206. d.writeFile = nil
  207. }
  208. for i := d.readFileNum; i <= d.writeFileNum; i++ {
  209. fn := d.fileName(i)
  210. innerErr := os.Remove(fn)
  211. if innerErr != nil && !os.IsNotExist(innerErr) {
  212. d.logf(ERROR, "DISKQUEUE(%s) failed to remove data file - %s", d.name, innerErr)
  213. err = innerErr
  214. }
  215. }
  216. d.writeFileNum++
  217. d.writePos = 0
  218. d.readFileNum = d.writeFileNum
  219. d.readPos = 0
  220. d.nextReadFileNum = d.writeFileNum
  221. d.nextReadPos = 0
  222. d.depth = 0
  223. return err
  224. }
  225. // readOne performs a low level filesystem read for a single []byte
  226. // while advancing read positions and rolling files, if necessary
  227. func (d *diskQueue) readOne() ([]byte, error) {
  228. var err error
  229. var msgSize int32
  230. if d.readFile == nil {
  231. curFileName := d.fileName(d.readFileNum)
  232. d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
  233. if err != nil {
  234. return nil, err
  235. }
  236. d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
  237. if d.readPos > 0 {
  238. _, err = d.readFile.Seek(d.readPos, 0)
  239. if err != nil {
  240. d.readFile.Close()
  241. d.readFile = nil
  242. return nil, err
  243. }
  244. }
  245. // for "complete" files (i.e. not the "current" file), maxBytesPerFileRead
  246. // should be initialized to the file's size, or default to maxBytesPerFile
  247. d.maxBytesPerFileRead = d.maxBytesPerFile
  248. if d.readFileNum < d.writeFileNum {
  249. stat, err := d.readFile.Stat()
  250. if err == nil {
  251. d.maxBytesPerFileRead = stat.Size()
  252. }
  253. }
  254. d.reader = bufio.NewReader(d.readFile)
  255. }
  256. err = binary.Read(d.reader, binary.BigEndian, &msgSize)
  257. if err != nil {
  258. d.readFile.Close()
  259. d.readFile = nil
  260. return nil, err
  261. }
  262. if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
  263. // this file is corrupt and we have no reasonable guarantee on
  264. // where a new message should begin
  265. d.readFile.Close()
  266. d.readFile = nil
  267. return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
  268. }
  269. readBuf := make([]byte, msgSize)
  270. _, err = io.ReadFull(d.reader, readBuf)
  271. if err != nil {
  272. d.readFile.Close()
  273. d.readFile = nil
  274. return nil, err
  275. }
  276. totalBytes := int64(4 + msgSize)
  277. // we only advance next* because we have not yet sent this to consumers
  278. // (where readFileNum, readPos will actually be advanced)
  279. d.nextReadPos = d.readPos + totalBytes
  280. d.nextReadFileNum = d.readFileNum
  281. // we only consider rotating if we're reading a "complete" file
  282. // and since we cannot know the size at which it was rotated, we
  283. // rely on maxBytesPerFileRead rather than maxBytesPerFile
  284. if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
  285. if d.readFile != nil {
  286. d.readFile.Close()
  287. d.readFile = nil
  288. }
  289. d.nextReadFileNum++
  290. d.nextReadPos = 0
  291. }
  292. return readBuf, nil
  293. }
  294. // writeOne performs a low level filesystem write for a single []byte
  295. // while advancing write positions and rolling files, if necessary
  296. func (d *diskQueue) writeOne(data []byte) error {
  297. var err error
  298. dataLen := int32(len(data))
  299. totalBytes := int64(4 + dataLen)
  300. if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
  301. return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
  302. }
  303. // will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max
  304. if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
  305. if d.readFileNum == d.writeFileNum {
  306. d.maxBytesPerFileRead = d.writePos
  307. }
  308. d.writeFileNum++
  309. d.writePos = 0
  310. // sync every time we start writing to a new file
  311. err = d.sync()
  312. if err != nil {
  313. d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
  314. }
  315. if d.writeFile != nil {
  316. d.writeFile.Close()
  317. d.writeFile = nil
  318. }
  319. }
  320. if d.writeFile == nil {
  321. curFileName := d.fileName(d.writeFileNum)
  322. d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
  323. if err != nil {
  324. return err
  325. }
  326. d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
  327. if d.writePos > 0 {
  328. _, err = d.writeFile.Seek(d.writePos, 0)
  329. if err != nil {
  330. d.writeFile.Close()
  331. d.writeFile = nil
  332. return err
  333. }
  334. }
  335. }
  336. d.writeBuf.Reset()
  337. err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
  338. if err != nil {
  339. return err
  340. }
  341. _, err = d.writeBuf.Write(data)
  342. if err != nil {
  343. return err
  344. }
  345. // only write to the file once
  346. _, err = d.writeFile.Write(d.writeBuf.Bytes())
  347. if err != nil {
  348. d.writeFile.Close()
  349. d.writeFile = nil
  350. return err
  351. }
  352. d.writePos += totalBytes
  353. d.depth += 1
  354. return err
  355. }
  356. // sync fsyncs the current writeFile and persists metadata
  357. func (d *diskQueue) sync() error {
  358. if d.writeFile != nil {
  359. err := d.writeFile.Sync()
  360. if err != nil {
  361. d.writeFile.Close()
  362. d.writeFile = nil
  363. return err
  364. }
  365. }
  366. err := d.persistMetaData()
  367. if err != nil {
  368. return err
  369. }
  370. d.needSync = false
  371. return nil
  372. }
  373. // retrieveMetaData initializes state from the filesystem
  374. func (d *diskQueue) retrieveMetaData() error {
  375. var f *os.File
  376. var err error
  377. fileName := d.metaDataFileName()
  378. f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
  379. if err != nil {
  380. return err
  381. }
  382. defer f.Close()
  383. var depth int64
  384. _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
  385. &depth,
  386. &d.readFileNum, &d.readPos,
  387. &d.writeFileNum, &d.writePos)
  388. if err != nil {
  389. return err
  390. }
  391. d.depth = depth
  392. d.nextReadFileNum = d.readFileNum
  393. d.nextReadPos = d.readPos
  394. // if the metadata was not sync'd at the last shutdown of nsqd
  395. // then the actual file size might actually be larger than the writePos,
  396. // in which case the safest thing to do is skip to the next file for
  397. // writes, and let the reader salvage what it can from the messages in the
  398. // diskqueue beyond the metadata's likely also stale readPos
  399. fileName = d.fileName(d.writeFileNum)
  400. fileInfo, err := os.Stat(fileName)
  401. if err != nil {
  402. return err
  403. }
  404. fileSize := fileInfo.Size()
  405. if d.writePos < fileSize {
  406. d.logf(WARN,
  407. "DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file",
  408. d.name, fileName, d.writePos, fileSize)
  409. d.writeFileNum += 1
  410. d.writePos = 0
  411. if d.writeFile != nil {
  412. d.writeFile.Close()
  413. d.writeFile = nil
  414. }
  415. }
  416. return nil
  417. }
  418. // persistMetaData atomically writes state to the filesystem
  419. func (d *diskQueue) persistMetaData() error {
  420. var f *os.File
  421. var err error
  422. fileName := d.metaDataFileName()
  423. tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
  424. // write to tmp file
  425. f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
  426. if err != nil {
  427. return err
  428. }
  429. _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
  430. d.depth,
  431. d.readFileNum, d.readPos,
  432. d.writeFileNum, d.writePos)
  433. if err != nil {
  434. f.Close()
  435. return err
  436. }
  437. f.Sync()
  438. f.Close()
  439. // atomically rename
  440. return os.Rename(tmpFileName, fileName)
  441. }
  442. func (d *diskQueue) metaDataFileName() string {
  443. return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name)
  444. }
  445. func (d *diskQueue) fileName(fileNum int64) string {
  446. return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
  447. }
  448. func (d *diskQueue) checkTailCorruption(depth int64) {
  449. if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
  450. return
  451. }
  452. // we've reached the end of the diskqueue
  453. // if depth isn't 0 something went wrong
  454. if depth != 0 {
  455. if depth < 0 {
  456. d.logf(ERROR,
  457. "DISKQUEUE(%s) negative depth at tail (%d), metadata corruption, resetting 0...",
  458. d.name, depth)
  459. } else if depth > 0 {
  460. d.logf(ERROR,
  461. "DISKQUEUE(%s) positive depth at tail (%d), data loss, resetting 0...",
  462. d.name, depth)
  463. }
  464. // force set depth 0
  465. d.depth = 0
  466. d.needSync = true
  467. }
  468. if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
  469. if d.readFileNum > d.writeFileNum {
  470. d.logf(ERROR,
  471. "DISKQUEUE(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
  472. d.name, d.readFileNum, d.writeFileNum)
  473. }
  474. if d.readPos > d.writePos {
  475. d.logf(ERROR,
  476. "DISKQUEUE(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
  477. d.name, d.readPos, d.writePos)
  478. }
  479. d.skipToNextRWFile()
  480. d.needSync = true
  481. }
  482. }
  483. func (d *diskQueue) moveForward() {
  484. oldReadFileNum := d.readFileNum
  485. d.readFileNum = d.nextReadFileNum
  486. d.readPos = d.nextReadPos
  487. d.depth -= 1
  488. // see if we need to clean up the old file
  489. if oldReadFileNum != d.nextReadFileNum {
  490. // sync every time we start reading from a new file
  491. d.needSync = true
  492. fn := d.fileName(oldReadFileNum)
  493. err := os.Remove(fn)
  494. if err != nil {
  495. d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
  496. }
  497. }
  498. d.checkTailCorruption(d.depth)
  499. }
  500. func (d *diskQueue) handleReadError() {
  501. // jump to the next read file and rename the current (bad) file
  502. if d.readFileNum == d.writeFileNum {
  503. // if you can't properly read from the current write file it's safe to
  504. // assume that something is fucked and we should skip the current file too
  505. if d.writeFile != nil {
  506. d.writeFile.Close()
  507. d.writeFile = nil
  508. }
  509. d.writeFileNum++
  510. d.writePos = 0
  511. }
  512. badFn := d.fileName(d.readFileNum)
  513. badRenameFn := badFn + ".bad"
  514. d.logf(WARN,
  515. "DISKQUEUE(%s) jump to next file and saving bad file as %s",
  516. d.name, badRenameFn)
  517. err := os.Rename(badFn, badRenameFn)
  518. if err != nil {
  519. d.logf(ERROR,
  520. "DISKQUEUE(%s) failed to rename bad diskqueue file %s to %s",
  521. d.name, badFn, badRenameFn)
  522. }
  523. d.readFileNum++
  524. d.readPos = 0
  525. d.nextReadFileNum = d.readFileNum
  526. d.nextReadPos = 0
  527. // significant state change, schedule a sync on the next iteration
  528. d.needSync = true
  529. d.checkTailCorruption(d.depth)
  530. }
  531. // ioLoop provides the backend for exposing a go channel (via ReadChan())
  532. // in support of multiple concurrent queue consumers
  533. //
  534. // it works by looping and branching based on whether or not the queue has data
  535. // to read and blocking until data is either read or written over the appropriate
  536. // go channels
  537. //
  538. // conveniently this also means that we're asynchronously reading from the filesystem
  539. func (d *diskQueue) ioLoop() {
  540. var dataRead []byte
  541. var err error
  542. var count int64
  543. var r chan []byte
  544. syncTicker := time.NewTicker(d.syncTimeout)
  545. for {
  546. // dont sync all the time :)
  547. if count == d.syncEvery {
  548. d.needSync = true
  549. }
  550. if d.needSync {
  551. err = d.sync()
  552. if err != nil {
  553. d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
  554. }
  555. count = 0
  556. }
  557. if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
  558. if d.nextReadPos == d.readPos {
  559. dataRead, err = d.readOne()
  560. if err != nil {
  561. d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
  562. d.name, d.readPos, d.fileName(d.readFileNum), err)
  563. d.handleReadError()
  564. continue
  565. }
  566. }
  567. r = d.readChan
  568. } else {
  569. r = nil
  570. }
  571. select {
  572. // the Go channel spec dictates that nil channel operations (read or write)
  573. // in a select are skipped, we set r to d.readChan only when there is data to read
  574. case r <- dataRead:
  575. count++
  576. // moveForward sets needSync flag if a file is removed
  577. d.moveForward()
  578. case d.depthChan <- d.depth:
  579. case <-d.emptyChan:
  580. d.emptyResponseChan <- d.deleteAllFiles()
  581. count = 0
  582. case dataWrite := <-d.writeChan:
  583. count++
  584. d.writeResponseChan <- d.writeOne(dataWrite)
  585. case <-syncTicker.C:
  586. if count == 0 {
  587. // avoid sync when there's no activity
  588. continue
  589. }
  590. d.needSync = true
  591. case <-d.exitChan:
  592. goto exit
  593. }
  594. }
  595. exit:
  596. d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
  597. syncTicker.Stop()
  598. d.exitSyncChan <- 1
  599. }