deployer.py 23 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Author: wushuiyong
  4. # @Created Time : 日 1/ 1 23:43:12 2017
  5. # @Description:
  6. import time
  7. from datetime import datetime
  8. import os
  9. import pwd
  10. import re
  11. from flask import current_app
  12. from flask_socketio import emit
  13. from walle.model.project import ProjectModel
  14. from walle.model.record import RecordModel
  15. from walle.model.task import TaskModel
  16. from walle.service.code import Code
  17. from walle.service.error import WalleError
  18. from walle.service.utils import color_clean
  19. from walle.service.utils import excludes_format, includes_format
  20. from walle.service.notice import Notice
  21. from walle.service.waller import Waller
  22. from walle.service.git.repo import Repo
  23. from flask_login import current_user
  24. class Deployer:
  25. '''
  26. 序列号
  27. '''
  28. stage = 'init'
  29. sequence = 0
  30. stage_prev_deploy = 'prev_deploy'
  31. stage_deploy = 'deploy'
  32. stage_post_deploy = 'post_deploy'
  33. stage_prev_release = 'prev_release'
  34. stage_release = 'release'
  35. stage_post_release = 'post_release'
  36. task_id = '0'
  37. user_id = '0'
  38. taskMdl = None
  39. TaskRecord = None
  40. console = False
  41. custom_global_env = {}
  42. version = datetime.now().strftime('%Y%m%d%H%M%S')
  43. local_codebase, dir_codebase_project, project_name = None, None, None
  44. dir_release, dir_webroot = None, None
  45. connections, success, errors = {}, {}, {}
  46. release_version_tar, previous_release_version, release_version = None, None, None
  47. local = None
  48. def __init__(self, task_id=None, project_id=None, console=False):
  49. self.local_codebase = current_app.config.get('CODE_BASE').rstrip('/') + '/'
  50. self.localhost = Waller(host='127.0.0.1')
  51. self.TaskRecord = RecordModel()
  52. if task_id:
  53. self.task_id = task_id
  54. # task start
  55. current_app.logger.info(self.task_id)
  56. self.taskMdl = TaskModel().item(self.task_id)
  57. self.user_id = self.taskMdl.get('user_id')
  58. self.servers = self.taskMdl.get('servers_info')
  59. self.project_info = self.taskMdl.get('project_info')
  60. # copy to a local version
  61. self.release_version = '{project_id}_{task_id}_{timestamp}'.format(
  62. project_id=self.project_info['id'],
  63. task_id=self.task_id,
  64. timestamp=time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())),
  65. )
  66. current_app.logger.info(self.taskMdl)
  67. # 将环境变量包在 "" 里,防止特殊字符报错
  68. format_export = lambda val: '"%s"' % str(val).replace('"', '').replace("'", '')
  69. self.custom_global_env = {
  70. 'WEBROOT': str(self.project_info['target_root']),
  71. 'VERSION': str(self.release_version),
  72. 'CURRENT_RELEASE': str(self.project_info['target_releases']),
  73. 'BRANCH': format_export(self.taskMdl.get('branch')),
  74. 'TAG': str(self.taskMdl.get('tag')),
  75. 'COMMIT_ID': str(self.taskMdl.get('commit_id')),
  76. 'PROJECT_NAME': format_export(self.project_info['name']),
  77. 'PROJECT_ID': str(self.project_info['id']),
  78. 'TASK_NAME': format_export(self.taskMdl.get('name')),
  79. 'TASK_ID': str(self.task_id),
  80. 'DEPLOY_USER': str(self.taskMdl.get('user_name')),
  81. 'DEPLOY_TIME': str(time.strftime('%Y%m%d-%H:%M:%S', time.localtime(time.time()))),
  82. }
  83. if self.project_info['task_vars']:
  84. task_vars = [i.strip() for i in self.project_info['task_vars'].split('\n') if i.strip() and not i.strip().startswith('#')]
  85. for var in task_vars:
  86. var_list = var.split('=', 1)
  87. if len(var_list) != 2:
  88. continue
  89. self.custom_global_env[var_list[0]] = var_list[1]
  90. self.localhost.init_env(env=self.custom_global_env)
  91. if project_id:
  92. self.project_id = project_id
  93. self.project_info = ProjectModel(id=project_id).item()
  94. self.servers = self.project_info['servers_info']
  95. self.project_name = self.project_info['id']
  96. self.dir_codebase_project = self.local_codebase + str(self.project_name)
  97. # self.init_repo()
  98. # start to deploy
  99. self.console = console
  100. def config(self, console=None):
  101. return {
  102. 'task_id': self.task_id,
  103. 'user_id': self.user_id,
  104. 'stage': self.stage,
  105. 'sequence': self.sequence,
  106. 'console': console if console is not None else self.console
  107. }
  108. def start(self):
  109. RecordModel().query.filter_by(task_id=self.task_id).delete()
  110. TaskModel().get_by_id(self.task_id).update({'status': TaskModel.status_doing})
  111. self.taskMdl = TaskModel().item(self.task_id)
  112. # ===================== fabric ================
  113. # SocketHandler
  114. def prev_deploy(self):
  115. '''
  116. # TODO
  117. socketio.sleep(0.001)
  118. 1.代码检出前要做的基础工作
  119. - 检查 当前用户
  120. - 检查 python 版本
  121. - 检查 git 版本
  122. - 检查 目录是否存在
  123. - 用户自定义命令
  124. :return:
  125. '''
  126. self.stage = self.stage_prev_deploy
  127. self.sequence = 1
  128. # 检查 目录是否存在
  129. self.init_repo()
  130. # 用户自定义命令
  131. commands = self.project_info['prev_deploy']
  132. if commands:
  133. for command in commands.split('\n'):
  134. if command.strip().startswith('#') or not command.strip():
  135. continue
  136. with self.localhost.cd(self.dir_codebase_project):
  137. result = self.localhost.local(command, wenv=self.config())
  138. def deploy(self):
  139. '''
  140. 2.检出代码
  141. :param project_name:
  142. :return:
  143. '''
  144. self.stage = self.stage_deploy
  145. self.sequence = 2
  146. #
  147. # # copy to a local version
  148. # self.release_version = '%s_%s_%s' % (
  149. # self.project_name, self.task_id, time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())))
  150. with self.localhost.cd(self.local_codebase):
  151. command = 'cp -rf %s %s' % (self.dir_codebase_project, self.release_version)
  152. current_app.logger.info('cd %s command: %s ', self.dir_codebase_project, command)
  153. result = self.localhost.local(command, wenv=self.config())
  154. # 更新到指定 branch/commit_id 或 tag
  155. repo = Repo(self.local_codebase + self.release_version)
  156. if self.project_info['repo_mode'] == ProjectModel.repo_mode_branch:
  157. repo.checkout_2_commit(branch=self.taskMdl['branch'], commit=self.taskMdl['commit_id'])
  158. else:
  159. repo.checkout_2_tag(tag=self.taskMdl['tag'])
  160. def post_deploy(self):
  161. '''
  162. 3.检出代码后要做的任务
  163. - 用户自定义操作命令
  164. - 代码编译
  165. - 清除日志文件及无用文件
  166. -
  167. - 压缩打包
  168. - 传送到版本库 release
  169. :return:
  170. '''
  171. self.stage = self.stage_post_deploy
  172. self.sequence = 3
  173. # 用户自定义命令
  174. commands = self.project_info['post_deploy']
  175. if commands:
  176. for command in commands.split('\n'):
  177. if command.strip().startswith('#') or not command.strip():
  178. continue
  179. with self.localhost.cd(self.local_codebase + self.release_version):
  180. result = self.localhost.local(command, wenv=self.config())
  181. # 压缩打包
  182. # 排除文件发布
  183. self.release_version_tar = '%s.tgz' % (self.release_version)
  184. with self.localhost.cd(self.local_codebase):
  185. if self.project_info['is_include']:
  186. files = includes_format(self.release_version, self.project_info['excludes'])
  187. else:
  188. files = excludes_format(self.release_version, self.project_info['excludes'])
  189. command = 'tar zcf %s/%s %s' % (self.local_codebase.rstrip('/'), self.release_version_tar, files)
  190. result = self.localhost.local(command, wenv=self.config())
  191. def prev_release(self, waller):
  192. '''
  193. 4.部署代码到目标机器前做的任务
  194. - 检查 webroot 父目录是否存在
  195. :return:
  196. '''
  197. self.stage = self.stage_prev_release
  198. self.sequence = 4
  199. # 检查 target_releases 父目录是否存在
  200. command = 'mkdir -p %s' % (self.project_info['target_releases'])
  201. result = waller.run(command, wenv=self.config())
  202. # TODO md5
  203. # 传送到版本库 release
  204. result = waller.put(self.local_codebase + self.release_version_tar,
  205. remote=self.project_info['target_releases'], wenv=self.config())
  206. current_app.logger.info('command: %s', dir(result))
  207. # 解压
  208. self.release_untar(waller)
  209. # 用户自定义命令
  210. self.prev_release_custom(waller)
  211. def prev_release_custom(self, waller):
  212. # 用户自定义命令
  213. commands = self.project_info['prev_release']
  214. if commands:
  215. for command in commands.split('\n'):
  216. if command.strip().startswith('#') or not command.strip():
  217. continue
  218. # TODO
  219. target_release_version = "%s/%s" % (self.project_info['target_releases'], self.release_version)
  220. with waller.cd(target_release_version):
  221. result = waller.run(command, wenv=self.config())
  222. def release(self, waller):
  223. '''
  224. 5.部署代码到目标机器做的任务
  225. - 打包代码 local
  226. - scp local => remote
  227. - 解压 remote
  228. :return:
  229. '''
  230. self.stage = self.stage_release
  231. self.sequence = 5
  232. with waller.cd(self.project_info['target_releases']):
  233. # 0. get previous link
  234. command = '[ -L %s ] && readlink %s || echo ""' % (self.project_info['target_root'], self.project_info['target_root'])
  235. result = waller.run(command, wenv=self.config(console=False))
  236. self.previous_release_version = os.path.basename(result.stdout).strip()
  237. # 1. create a tmp link dir
  238. current_link_tmp_dir = 'current-tmp-%s' % (self.task_id)
  239. command = 'ln -sfn {library}/{version} {library}/{current_tmp}'.format(
  240. library=self.project_info['target_releases'], version=self.release_version,
  241. current_tmp=current_link_tmp_dir)
  242. result = waller.run(command, wenv=self.config())
  243. # 2. make a soft link from release to tmp link
  244. # 3. move tmp link to webroot
  245. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  246. command = 'mv -fT %s %s' % (current_link_tmp_dir, self.project_info['target_root'])
  247. result = waller.run(command, wenv=self.config())
  248. def rollback(self, waller):
  249. '''
  250. 5.部署代码到目标机器做的任务
  251. - 恢复旧版本
  252. :return:
  253. '''
  254. self.stage = self.stage_release
  255. self.sequence = 5
  256. with waller.cd(self.project_info['target_releases']):
  257. # 0. get previous link
  258. command = '[ -L %s ] && readlink %s || echo ""' % (self.project_info['target_root'], self.project_info['target_root'])
  259. result = waller.run(command, wenv=self.config(console=False))
  260. self.previous_release_version = os.path.basename(result.stdout)
  261. # 1. create a tmp link dir
  262. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  263. command = 'ln -sfn %s/%s %s' % (
  264. self.project_info['target_releases'], self.release_version, current_link_tmp_dir)
  265. result = waller.run(command, wenv=self.config())
  266. # 2. make a soft link from release to tmp link
  267. # 3. move tmp link to webroot
  268. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  269. command = 'mv -fT %s %s' % (current_link_tmp_dir, self.project_info['target_root'])
  270. result = waller.run(command, wenv=self.config())
  271. def release_untar(self, waller):
  272. '''
  273. 解压版本包
  274. :return:
  275. '''
  276. with waller.cd(self.project_info['target_releases']):
  277. command = 'tar zxf %s' % (self.release_version_tar)
  278. result = waller.run(command, wenv=self.config())
  279. def post_release(self, waller):
  280. '''
  281. 6.部署代码到目标机器后要做的任务
  282. - 切换软链
  283. - 重启 nginx
  284. :return:
  285. '''
  286. self.stage = self.stage_post_release
  287. self.sequence = 6
  288. # 用户自定义命令
  289. commands = self.project_info['post_release']
  290. if commands:
  291. for command in commands.split('\n'):
  292. if command.strip().startswith('#') or not command.strip():
  293. continue
  294. # TODO
  295. with waller.cd(self.project_info['target_root']):
  296. pty = False if command.find('nohup') >= 0 else True
  297. result = waller.run(command, wenv=self.config(), pty=pty)
  298. # 个性化,用户重启的不一定是NGINX,可能是tomcat, apache, php-fpm等
  299. # self.post_release_service(waller)
  300. # 清理现场
  301. self.cleanup_remote(waller)
  302. def post_release_service(self, waller):
  303. '''
  304. 代码部署完成后,服务启动工作,如: nginx重启
  305. :param connection:
  306. :return:
  307. '''
  308. with waller.cd(self.project_info['target_root']):
  309. command = 'sudo service nginx restart'
  310. result = waller.run(command, wenv=self.config())
  311. def project_detection(self):
  312. errors = []
  313. # LOCAL_SERVER_USER => git
  314. # LOCAL_SERVER_USER => target_servers
  315. for server_info in self.servers:
  316. waller = Waller(host=server_info['host'], user=server_info['user'], port=server_info['port'])
  317. result = waller.run('id', exception=False, wenv=self.config())
  318. if result.failed:
  319. errors.append({
  320. 'title': '远程目标机器免密码登录失败',
  321. 'why': '远程目标机器:%s 错误:%s' % (server_info['host'], result.stdout),
  322. 'how': '在宿主机中配置免密码登录,把宿主机用户%s的~/.ssh/ssh_rsa.pub添加到远程目标机器用户%s的~/.ssh/authorized_keys。了解更多:http://walle-web.io/docs/troubleshooting.html' % (
  323. pwd.getpwuid(os.getuid())[0], server_info['host']),
  324. })
  325. # maybe this is no webroot's parent dir
  326. command = '[ -d {webroot} ] || mkdir -p {webroot}'.format(webroot=os.path.basename(self.project_info['target_root']))
  327. result = waller.run(command, exception=False, wenv=self.config(console=False))
  328. # 检查 webroot 父目录是否存在,是否为软链
  329. command = '[ -L "%s" ] && echo "true" || echo "false"' % (self.project_info['target_root'])
  330. result = waller.run(command, exception=False, wenv=self.config())
  331. if result.stdout == 'false':
  332. errors.append({
  333. 'title': '远程目标机器webroot不能是已建好的目录',
  334. 'why': '远程目标机器%s webroot不能是已存在的目录,必须为软链接,你不必新建,walle会自行创建。' % (server_info['host']),
  335. 'how': '手工删除远程目标机器:%s webroot目录:%s' % (server_info['host'], self.project_info['target_root']),
  336. })
  337. # remote release directory
  338. return errors
  339. def list_tag(self):
  340. repo = Repo(self.dir_codebase_project)
  341. repo.init(url=self.project_info['repo_url'])
  342. return repo.tags()
  343. def list_branch(self):
  344. repo = Repo(self.dir_codebase_project)
  345. repo.init(url=self.project_info['repo_url'])
  346. return repo.branches()
  347. def list_commit(self, branch):
  348. repo = Repo(self.dir_codebase_project)
  349. repo.init(url=self.project_info['repo_url'])
  350. return repo.commits(branch)
  351. def init_repo(self):
  352. repo = Repo(self.dir_codebase_project)
  353. repo.init(url=self.project_info['repo_url'])
  354. # @todo 没有做emit
  355. def cleanup_local(self):
  356. # clean local package
  357. command = 'rm -rf {project_id}_{task_id}_*'.format(project_id=self.project_info['id'], task_id=self.task_id)
  358. with self.localhost.cd(self.local_codebase):
  359. result = self.localhost.local(command, wenv=self.config())
  360. def cleanup_remote(self, waller):
  361. command = 'rm -rf {project_id}_{task_id}_*.tgz'.format(project_id=self.project_info['id'], task_id=self.task_id)
  362. with waller.cd(self.project_info['target_releases']):
  363. result = waller.run(command, wenv=self.config())
  364. command = 'rm -rf `ls -t {project_id}_* | tail -n +{keep_version_num}`'.format(
  365. project_id=self.project_info['id'], keep_version_num=int(self.project_info['keep_version_num']) + 1)
  366. with waller.cd(self.project_info['target_releases']):
  367. result = waller.run(command, wenv=self.config())
  368. def logs(self):
  369. return RecordModel().fetch(task_id=self.task_id)
  370. def end(self, success=True, update_status=True):
  371. if update_status:
  372. status = TaskModel.status_success if success else TaskModel.status_fail
  373. current_app.logger.info('success:%s, status:%s' % (success, status))
  374. TaskModel().get_by_id(self.task_id).update({
  375. 'status': status,
  376. 'link_id': self.release_version,
  377. 'ex_link_id': self.previous_release_version,
  378. })
  379. notice_info = {
  380. 'title': '',
  381. 'username': current_user.username,
  382. 'project_name': self.project_info['name'],
  383. 'task_name': '%s ([%s](%s))' % (self.taskMdl.get('name'), self.task_id, Notice.task_url(project_name=self.project_info['name'], task_id=self.task_id)),
  384. 'branch': self.taskMdl.get('branch'),
  385. 'commit': self.taskMdl.get('commit_id'),
  386. 'tag': self.taskMdl.get('tag'),
  387. 'repo_mode': self.project_info['repo_mode'],
  388. }
  389. notice = Notice.create(self.project_info['notice_type'])
  390. if success:
  391. notice_info['title'] = '上线部署成功'
  392. notice.deploy_task(project_info=self.project_info, notice_info=notice_info)
  393. else:
  394. notice_info['title'] = '上线部署失败'
  395. notice.deploy_task(project_info=self.project_info, notice_info=notice_info)
  396. # 清理本地
  397. self.cleanup_local()
  398. if success:
  399. emit('success', {'event': 'finish', 'data': {'message': '部署完成,辛苦了,为你的努力喝彩!'}}, room=self.task_id)
  400. else:
  401. emit('fail', {'event': 'finish', 'data': {'message': Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  402. def walle_deploy(self):
  403. self.start()
  404. try:
  405. self.prev_deploy()
  406. self.deploy()
  407. self.post_deploy()
  408. is_all_servers_success = True
  409. for server_info in self.servers:
  410. host = server_info['host']
  411. try:
  412. waller = Waller(host=host, user=server_info['user'], port=server_info['port'], inline_ssh_env=True)
  413. waller.init_env(env=self.custom_global_env)
  414. self.connections[host] = waller
  415. self.prev_release(self.connections[host])
  416. self.release(self.connections[host])
  417. self.post_release(self.connections[host])
  418. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  419. task_id=self.task_id, status=RecordModel.status_success, host=host,
  420. user=server_info['user'], command='')
  421. emit('success', {'event': 'finish', 'data': {'host': host, 'message': host + ' 部署完成!'}}, room=self.task_id)
  422. except Exception as e:
  423. is_all_servers_success = False
  424. current_app.logger.exception(e)
  425. self.errors[host] = e.message
  426. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  427. task_id=self.task_id, status=RecordModel.status_fail, host=host,
  428. user=server_info['user'], command='')
  429. emit('fail', {'event': 'finish', 'data': {'host': host, 'message': host + Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  430. self.end(is_all_servers_success)
  431. except Exception as e:
  432. self.end(False)
  433. return {'success': self.success, 'errors': self.errors}
  434. def walle_rollback(self):
  435. self.start()
  436. try:
  437. is_all_servers_success = True
  438. self.release_version = self.taskMdl.get('link_id')
  439. for server_info in self.servers:
  440. host = server_info['host']
  441. try:
  442. waller = Waller(host=host, user=server_info['user'], port=server_info['port'], inline_ssh_env=True)
  443. waller.init_env(env=self.custom_global_env)
  444. self.connections[host] = waller
  445. self.prev_release_custom(self.connections[host])
  446. self.release(self.connections[host])
  447. self.post_release(self.connections[host])
  448. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  449. task_id=self.task_id, status=RecordModel.status_success, host=host,
  450. user=server_info['user'], command='')
  451. emit('success', {'event': 'finish', 'data': {'host': host, 'message': host + ' 部署完成!'}}, room=self.task_id)
  452. except Exception as e:
  453. is_all_servers_success = False
  454. current_app.logger.exception(e)
  455. self.errors[host] = e.message
  456. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  457. task_id=self.task_id, status=RecordModel.status_fail, host=host,
  458. user=server_info['user'], command='')
  459. emit('fail', {'event': 'finish', 'data': {'host': host, 'message': host + Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  460. self.end(is_all_servers_success)
  461. except Exception as e:
  462. self.end(False)
  463. return {'success': self.success, 'errors': self.errors}