ZKafka.php 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. <?php
  2. /* *****************************************************************
  3. * @Author: wushuiyong
  4. * @Created Time : 六 7/18 23:02:36 2015
  5. *
  6. * @File Name: ZKafka.php
  7. * @Description:
  8. * *****************************************************************/
  9. namespace app\components;
  10. use app\models\KafkaOffset;
  11. class ZKafka {
  12. const KAFKA_OFFSET = 'kafka_offset';
  13. // kafka的偏移量
  14. const KAFKA_ERROR_OFFSET = 'kafka_error_offset';
  15. const KAFKA_WARNING_OFFSET = 'kafka_warning_offset';
  16. const KAFKA_NOTICE_OFFSET = 'kafka_notice_offset';
  17. private static $_kafka;
  18. public static function produce($topic, $message) {
  19. return static::getInstance()->produce($topic, $message);
  20. }
  21. public static function consume($topic, $begin, $limit) {
  22. return static::getInstance()->consume($topic, $begin, $limit);
  23. }
  24. public static function getInstance() {
  25. if (is_object(static::$_kafka)) {
  26. return static::$_kafka;
  27. }
  28. return static::$_kafka = new \Kafka("localhost:9092");
  29. }
  30. public static function setOffset($topic, $offset) {
  31. $kafkaOffset = KafkaOffset::findOne(['name' => $topic]);
  32. if ($kafkaOffset) {
  33. $kafkaOffset->offset = $offset;
  34. return $kafkaOffset->save();
  35. } else {
  36. $kafkaOffset = new KafkaOffset();
  37. $kafkaOffset->name = $topic;
  38. $kafkaOffset->offset = $offset;
  39. return $kafkaOffset->save();
  40. }
  41. }
  42. public static function getOffset($topic) {
  43. $kafkaOffset = KafkaOffset::findOne(['name' => $topic]);
  44. return $kafkaOffset ? $kafkaOffset->offset : 0;
  45. }
  46. }