waller.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Author: wushuiyong
  4. # @Created Time : 日 1/ 1 23:43:12 2017
  5. # @Description:
  6. from fabric2 import Connection
  7. from flask import current_app
  8. from flask_socketio import emit
  9. from walle.model.record import RecordModel
  10. from invoke import Result
  11. from walle.service.code import Code
  12. from walle.service.utils import say_yes
  13. class Waller(Connection):
  14. connections, success, errors = {}, {}, {}
  15. release_version_tar, release_version = None, None
  16. def run(self, command, wenv=None, sudo=False, pty=True, exception=True, **kwargs):
  17. '''
  18. pty=True/False是直接影响到输出.False较适合在获取文本,True更适合websocket
  19. :param command:
  20. :param wenv:
  21. :param sudo: False/True default False
  22. :param exception: False/True default True
  23. False return Result(exited=xx, stderr=xx, stdout=xx) for process to raise custom exception by exited code
  24. True raise Exception
  25. :param kwargs:
  26. :return:
  27. '''
  28. message = 'deploying task_id=%s [%s@%s]$ %s ' % (wenv['task_id'], self.user, self.host, command)
  29. current_app.logger.info(message)
  30. try:
  31. if sudo:
  32. result = super(Waller, self).sudo(command, pty=pty, **kwargs)
  33. else:
  34. result = super(Waller, self).run(command, pty=pty, warn=True, watchers=[say_yes()], **kwargs)
  35. if result.failed:
  36. exitcode, stdout, stderr = result.exited, '', result.stdout
  37. else:
  38. exitcode, stdout, stderr = 0, result.stdout, ''
  39. message = 'task_id=%s, user:%s host:%s command:%s status:%s, success:%s, error:%s' % (
  40. wenv['task_id'], self.user, self.host, command, exitcode, stdout, stderr
  41. )
  42. # TODO
  43. ws_dict = {
  44. 'user': self.user,
  45. 'host': self.host,
  46. 'cmd': command,
  47. 'status': exitcode,
  48. 'stage': wenv['stage'],
  49. 'sequence': wenv['sequence'],
  50. 'success': stdout,
  51. 'error': stderr,
  52. }
  53. if wenv['console']:
  54. emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])
  55. RecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
  56. task_id=wenv['task_id'], status=exitcode, host=self.host, user=self.user,
  57. command=result.command, success=stdout,
  58. error=stderr)
  59. current_app.logger.info(result)
  60. if exitcode != Code.Ok:
  61. current_app.logger.error(message, exc_info=1)
  62. current_app.logger.exception(result.stdout.strip(), exc_info=1)
  63. return result
  64. return result
  65. except Exception as e:
  66. current_app.logger.exception(e)
  67. # TODO 貌似可能的异常有很多种,需要分层才能完美解决 something wrong without e.result
  68. error = e.result if 'result' in e else e.message
  69. RecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
  70. task_id=wenv['task_id'], status=1, host=self.host, user=self.user,
  71. command=command, success='', error=error)
  72. if hasattr(e, 'reason') and hasattr(e, 'result'):
  73. message = 'task_id=%s, user:%s host:%s command:%s, status=1, reason:%s, result:%s exception:%s' % (
  74. wenv['task_id'], self.user, self.host, command, e.reason, error, e.message
  75. )
  76. else:
  77. message = 'task_id=%s, user:%s host:%s command:%s, status=1, message:%s' % (
  78. wenv['task_id'], self.user, self.host, command, e.message
  79. )
  80. current_app.logger.error(message, exc_info=1)
  81. # TODO
  82. ws_dict = {
  83. 'user': self.user,
  84. 'host': self.host,
  85. 'cmd': command,
  86. 'status': 1,
  87. 'stage': wenv['stage'],
  88. 'sequence': wenv['sequence'],
  89. 'success': '',
  90. 'error': error,
  91. }
  92. if wenv['console']:
  93. emit('console', {'event': 'console', 'data': ws_dict}, room=wenv['task_id'])
  94. if exception:
  95. raise e
  96. return Result(exited=-1, stderr=error, stdout=error)
  97. def sudo(self, command, wenv=None, **kwargs):
  98. return self.run(command, wenv=wenv, sudo=True, **kwargs)
  99. def get(self, remote, local=None, wenv=None):
  100. return self.sync(wtype='get', remote=remote, local=local, wenv=wenv)
  101. def put(self, local, remote=None, wenv=None, *args, **kwargs):
  102. return self.sync(wtype='put', local=local, remote=remote, wenv=wenv, *args, **kwargs)
  103. def sync(self, wtype, remote=None, local=None, wenv=None):
  104. command = 'scp %s %s@%s:%s' % (local, self.user, self.host, remote) if wtype == 'put' \
  105. else 'scp %s@%s:%s %s' % (self.user, self.host, remote, local)
  106. message = 'deploying task_id=%s [%s@%s]$ %s ' % (wenv['task_id'], self.user, self.host, command)
  107. current_app.logger.info(message)
  108. try:
  109. if wtype == 'put':
  110. result = super(Waller, self).put(local=local, remote=remote)
  111. current_app.logger.info('put: local %s, remote %s', local, remote)
  112. op_user = current_app.config.get('LOCAL_SERVER_USER')
  113. op_host = current_app.config.get('LOCAL_SERVER_HOST')
  114. else:
  115. result = super(Waller, self).get(remote=remote, local=local)
  116. current_app.logger.info('get: local %s, remote %s', local, remote)
  117. current_app.logger.info('get: orig_local %s, local %s', result.orig_local, result.local)
  118. op_user = self.user
  119. op_host = self.host
  120. current_app.logger.info('put: %s, %s', result, dir(result))
  121. # TODO 可能会有非22端口的问题
  122. RecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
  123. task_id=wenv['task_id'], status=0, host=self.host, user=self.user,
  124. command=command, )
  125. # TODO
  126. ws_dict = {
  127. 'user': op_user,
  128. 'host': op_host,
  129. 'cmd': command,
  130. 'status': 1,
  131. 'stage': wenv['stage'],
  132. 'sequence': wenv['sequence'],
  133. 'success': '',
  134. 'error': '',
  135. }
  136. if wenv['console']:
  137. emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])
  138. return result
  139. except Exception as e:
  140. # TODO 收尾下
  141. current_app.logger.info('put: %s, %s', e, dir(e))
  142. # TODO command
  143. ws_dict = {
  144. 'user': self.user,
  145. 'host': self.host,
  146. 'cmd': command,
  147. 'status': 1,
  148. 'stage': wenv['stage'],
  149. 'sequence': wenv['sequence'],
  150. 'success': '',
  151. 'error': e.message,
  152. }
  153. if wenv['console']:
  154. emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])