RunController.php 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. <?php
  2. namespace app\console;
  3. use yii;
  4. use yii\console\Controller;
  5. use app\components\ZKafka;
  6. use app\components\KafkaConsumer;
  7. use app\components\KafkaProducer;
  8. /**
  9. * Test controller
  10. */
  11. class RunController extends Controller {
  12. public function actionProduce() {
  13. // $ret = KafkaProducer::error('social.touch', '找不到数据');
  14. $ret = KafkaProducer::warning('social.touch', '只能切回去不能切回来');
  15. // $ret = KafkaProducer::notice('social.touch', '记录下数据');
  16. echo "\n==========================\n";
  17. dd($ret);
  18. }
  19. public function actionConsume() {
  20. return KafkaConsumer::process(KafkaProducer::TOPIC_ERROR, 10);
  21. }
  22. public function actionMongo() {
  23. $offset = ZKafka::getOffset(ZKafka::KAFKA_ERROR_OFFSET);
  24. $colError = Yii::$app->mongodb->getCollection(KafkaProducer::LOG);
  25. while (true) {
  26. echo 'offset:', $offset, PHP_EOL;
  27. $logs = KafkaConsumer::process(KafkaProducer::TOPIC_ERROR, $offset, 10);
  28. if (!$logs) {
  29. echo 'empty. sleep for a while...', PHP_EOL;
  30. sleep(5);
  31. continue;
  32. }
  33. foreach ($logs as $log) {
  34. $log = json_decode($log, true);
  35. if (empty($log)) continue;
  36. $ret = $colError->insert($log);
  37. echo $ret ? 'done' : 'error', PHP_EOL;
  38. }
  39. }
  40. }
  41. public function actionLog($level = KafkaProducer::LEVEL_ERROR) {
  42. if (!in_array($level, [KafkaProducer::LEVEL_ERROR, KafkaProducer::LEVEL_WARNING, KafkaProducer::LEVEL_NOTICE])) {
  43. throw new \Exception('暂无配置此错误级别:' . $level);
  44. }
  45. $limit = 10;
  46. $offsetName = 'kafka_' . $level . '_offset';
  47. $offset = ZKafka::getOffset($offsetName);
  48. $colError = Yii::$app->mongodb->getCollection(KafkaProducer::LOG);
  49. while (true) {
  50. echo 'offset:', $offset, PHP_EOL;
  51. $logs = KafkaConsumer::process($level, $offset, $limit);
  52. if (!$logs) {
  53. echo 'empty. sleep for a while...', PHP_EOL;
  54. sleep(5);
  55. continue;
  56. }
  57. foreach ($logs as $log) {
  58. $log = json_decode($log, true);
  59. if (empty($log)) continue;
  60. $ret = $colError->insert($log);
  61. echo $ret ? 'done' : 'error', PHP_EOL;
  62. if ($ret) {
  63. ZKafka::setOffset($offsetName, ++$offset);
  64. }
  65. }
  66. }
  67. }
  68. public function actionSetoffset() {
  69. $offset = ZKafka::getOffset(ZKafka::KAFKA_ERROR_OFFSET);
  70. d($offset);
  71. $ret = ZKafka::setOffset(ZKafka::KAFKA_ERROR_OFFSET, --$offset);
  72. d($ret);
  73. $offset = ZKafka::getOffset(ZKafka::KAFKA_ERROR_OFFSET);
  74. d($offset);
  75. }
  76. public function actionCost() {
  77. $ret = Zkafka::produce('adOffline', 'planId|1|xxxdasflkd');
  78. dd($ret);
  79. }
  80. }