waller.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Author: wushuiyong
  4. # @Created Time : 日 1/ 1 23:43:12 2017
  5. # @Description:
  6. import os
  7. import pwd
  8. from fabric2 import Connection
  9. from flask import current_app
  10. from flask_socketio import emit
  11. from walle.model.record import RecordModel
  12. from invoke import Result
  13. from walle.service.code import Code
  14. from walle.service.utils import say_yes
  15. class Waller(Connection):
  16. run_mode_sudo = 'sudo'
  17. run_mode_remote = 'remote'
  18. run_mode_local = 'local'
  19. connections, success, errors = {}, {}, {}
  20. release_version_tar, release_version = None, None
  21. custom_global_env = {}
  22. def init_env(self, env):
  23. self.custom_global_env = env
  24. def run(self, command, wenv=None, run_mode=run_mode_remote, pty=True, exception=True, **kwargs):
  25. '''
  26. pty=True/False是直接影响到输出.False较适合在获取文本,True更适合websocket
  27. :param command:
  28. :param wenv:
  29. :param sudo: False/True default False
  30. :param exception: False/True default True
  31. False return Result(exited=xx, stderr=xx, stdout=xx) for process to raise custom exception by exited code
  32. True raise Exception
  33. :param kwargs:
  34. :return:
  35. '''
  36. message = 'deploying task_id=%s [%s@%s]$ %s ' % (wenv['task_id'], self.user, self.host, command)
  37. current_app.logger.info(message)
  38. try:
  39. if run_mode == self.run_mode_sudo:
  40. result = super(Waller, self).sudo(command, pty=pty, env=self.custom_global_env, **kwargs)
  41. elif run_mode == self.run_mode_local:
  42. current_app.logger.info(self.custom_global_env)
  43. result = super(Waller, self).local(command, pty=pty, warn=True, watchers=[say_yes()], env=self.custom_global_env, **kwargs)
  44. else:
  45. result = super(Waller, self).run(command, pty=pty, warn=True, watchers=[say_yes()], env=self.custom_global_env, **kwargs)
  46. if result.failed:
  47. exitcode, stdout, stderr = result.exited, '', result.stdout
  48. if exception:
  49. raise Exception(stderr)
  50. else:
  51. exitcode, stdout, stderr = 0, result.stdout, ''
  52. message = 'task_id=%s, user:%s host:%s command:%s status:%s, success:%s, error:%s' % (
  53. wenv['task_id'], self.user, self.host, command, exitcode, stdout, stderr
  54. )
  55. # TODO
  56. ws_dict = {
  57. 'user': self.user,
  58. 'host': self.host,
  59. 'cmd': command,
  60. 'status': exitcode,
  61. 'stage': wenv['stage'],
  62. 'sequence': wenv['sequence'],
  63. 'success': stdout,
  64. 'error': stderr,
  65. }
  66. if wenv['console']:
  67. emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])
  68. RecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
  69. task_id=wenv['task_id'], status=exitcode, host=self.host, user=self.user,
  70. command=result.command, success=stdout,
  71. error=stderr)
  72. current_app.logger.info(result)
  73. if exitcode != Code.Ok:
  74. current_app.logger.error(message, exc_info=1)
  75. current_app.logger.exception(result.stdout.strip(), exc_info=1)
  76. return result
  77. return result
  78. except Exception as e:
  79. # TODO 貌似可能的异常有很多种,需要分层才能完美解决 something wrong without e.result
  80. current_app.logger.exception(e)
  81. if hasattr(e, 'message'):
  82. error = e.message
  83. elif hasattr(e, 'result'):
  84. error = e.result
  85. else:
  86. error = str(e)
  87. RecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
  88. task_id=wenv['task_id'], status=1, host=self.host, user=self.user,
  89. command=command, success='', error=error)
  90. if hasattr(e, 'reason') and hasattr(e, 'result'):
  91. message = 'task_id=%s, user:%s host:%s command:%s, status=1, reason:%s, result:%s exception:%s' % (
  92. wenv['task_id'], self.user, self.host, command, e.reason, error, e.message
  93. )
  94. else:
  95. message = 'task_id=%s, user:%s host:%s command:%s, status=1, message:%s' % (
  96. wenv['task_id'], self.user, self.host, command, error
  97. )
  98. current_app.logger.error(message, exc_info=1)
  99. # TODO
  100. ws_dict = {
  101. 'user': self.user,
  102. 'host': self.host,
  103. 'cmd': command,
  104. 'status': 1,
  105. 'stage': wenv['stage'],
  106. 'sequence': wenv['sequence'],
  107. 'success': '',
  108. 'error': error,
  109. }
  110. if wenv['console']:
  111. emit('console', {'event': 'console', 'data': ws_dict}, room=wenv['task_id'])
  112. if exception:
  113. raise e
  114. return Result(exited=-1, stderr=error, stdout=error)
  115. def sudo(self, command, wenv=None, **kwargs):
  116. return self.run(command, wenv=wenv, run_mode=self.run_mode_sudo, **kwargs)
  117. def get(self, remote, local=None, wenv=None):
  118. return self.sync(wtype='get', remote=remote, local=local, wenv=wenv)
  119. def put(self, local, remote=None, wenv=None, *args, **kwargs):
  120. return self.sync(wtype='put', local=local, remote=remote, wenv=wenv, *args, **kwargs)
  121. def local(self, command, wenv=None, **kwargs):
  122. return self.run(command, wenv=wenv, run_mode=self.run_mode_local, **kwargs)
  123. def sync(self, wtype, remote=None, local=None, wenv=None):
  124. command = 'scp %s %s@%s:%s' % (local, self.user, self.host, remote) if wtype == 'put' \
  125. else 'scp %s@%s:%s %s' % (self.user, self.host, remote, local)
  126. message = 'deploying task_id=%s [%s@%s]$ %s ' % (wenv['task_id'], self.user, self.host, command)
  127. current_app.logger.info(message)
  128. try:
  129. if wtype == 'put':
  130. result = super(Waller, self).put(local=local, remote=remote)
  131. current_app.logger.info('put: local %s, remote %s', local, remote)
  132. op_user = pwd.getpwuid(os.getuid())[0]
  133. op_host = '127.0.0.1'
  134. else:
  135. result = super(Waller, self).get(remote=remote, local=local)
  136. current_app.logger.info('get: local %s, remote %s', local, remote)
  137. current_app.logger.info('get: orig_local %s, local %s', result.orig_local, result.local)
  138. op_user = self.user
  139. op_host = self.host
  140. current_app.logger.info('put: %s, %s', result, dir(result))
  141. # TODO 可能会有非22端口的问题
  142. RecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
  143. task_id=wenv['task_id'], status=0, host=self.host, user=self.user,
  144. command=command, )
  145. # TODO
  146. ws_dict = {
  147. 'user': op_user,
  148. 'host': op_host,
  149. 'cmd': command,
  150. 'status': 1,
  151. 'stage': wenv['stage'],
  152. 'sequence': wenv['sequence'],
  153. 'success': '',
  154. 'error': '',
  155. }
  156. if wenv['console']:
  157. emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])
  158. return result
  159. except Exception as e:
  160. # TODO 收尾下
  161. current_app.logger.info('put: %s, %s', e, dir(e))
  162. if wtype == 'put':
  163. op_user = pwd.getpwuid(os.getuid())[0]
  164. op_host = '127.0.0.1'
  165. else:
  166. op_user = self.user
  167. op_host = self.host
  168. # TODO command
  169. ws_dict = {
  170. 'user': op_user,
  171. 'host': op_host,
  172. 'cmd': command,
  173. 'status': 1,
  174. 'stage': wenv['stage'],
  175. 'sequence': wenv['sequence'],
  176. 'success': '',
  177. 'error': str(e),
  178. }
  179. current_app.logger.info(ws_dict)
  180. if wenv['console']:
  181. emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])