deployer.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Author: wushuiyong
  4. # @Created Time : 日 1/ 1 23:43:12 2017
  5. # @Description:
  6. import time
  7. from datetime import datetime
  8. import os
  9. import pwd
  10. import re
  11. from flask import current_app
  12. from flask_socketio import emit
  13. from walle.model.project import ProjectModel
  14. from walle.model.record import RecordModel
  15. from walle.model.task import TaskModel
  16. from walle.service.code import Code
  17. from walle.service.error import WalleError
  18. from walle.service.utils import color_clean
  19. from walle.service.utils import excludes_format, includes_format
  20. from walle.service.notice import Notice
  21. from walle.service.waller import Waller
  22. from walle.service.git.repo import Repo
  23. from flask_login import current_user
  24. class Deployer:
  25. '''
  26. 序列号
  27. '''
  28. stage = 'init'
  29. sequence = 0
  30. stage_prev_deploy = 'prev_deploy'
  31. stage_deploy = 'deploy'
  32. stage_post_deploy = 'post_deploy'
  33. stage_prev_release = 'prev_release'
  34. stage_release = 'release'
  35. stage_post_release = 'post_release'
  36. task_id = '0'
  37. user_id = '0'
  38. taskMdl = None
  39. TaskRecord = None
  40. console = False
  41. custom_global_env = {}
  42. version = datetime.now().strftime('%Y%m%d%H%M%S')
  43. local_codebase, dir_codebase_project, project_name = None, None, None
  44. dir_release, dir_webroot = None, None
  45. connections, success, errors = {}, {}, {}
  46. release_version_tar, previous_release_version, release_version = None, None, None
  47. local = None
  48. def __init__(self, task_id=None, project_id=None, console=False):
  49. self.local_codebase = current_app.config.get('CODE_BASE').rstrip('/') + '/'
  50. self.localhost = Waller(host='127.0.0.1')
  51. self.TaskRecord = RecordModel()
  52. if task_id:
  53. self.task_id = task_id
  54. # task start
  55. current_app.logger.info(self.task_id)
  56. self.taskMdl = TaskModel().item(self.task_id)
  57. self.user_id = self.taskMdl.get('user_id')
  58. self.servers = self.taskMdl.get('servers_info')
  59. self.project_info = self.taskMdl.get('project_info')
  60. # copy to a local version
  61. self.release_version = self.taskMdl.get('link_id') if (self.taskMdl.get("is_rollback")) else \
  62. '{project_id}_{task_id}_{timestamp}'.format(
  63. project_id=self.project_info['id'],
  64. task_id=self.task_id,
  65. timestamp=time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())),
  66. )
  67. current_app.logger.info(self.taskMdl)
  68. # 将环境变量包在 "" 里,防止特殊字符报错
  69. format_export = lambda val: '"%s"' % str(val).replace('"', '').replace("'", '')
  70. self.custom_global_env = {
  71. 'WEBROOT': str(self.project_info['target_root']),
  72. 'VERSION': str(self.release_version),
  73. 'CURRENT_RELEASE': str(self.project_info['target_releases']),
  74. 'BRANCH': format_export(self.taskMdl.get('branch')),
  75. 'TAG': str(self.taskMdl.get('tag')),
  76. 'COMMIT_ID': str(self.taskMdl.get('commit_id')),
  77. 'PROJECT_NAME': format_export(self.project_info['name']),
  78. 'PROJECT_ID': str(self.project_info['id']),
  79. 'TASK_NAME': format_export(self.taskMdl.get('name')),
  80. 'TASK_ID': str(self.task_id),
  81. 'DEPLOY_USER': str(self.taskMdl.get('user_name')),
  82. 'DEPLOY_TIME': str(time.strftime('%Y%m%d-%H:%M:%S', time.localtime(time.time()))),
  83. }
  84. if self.project_info['task_vars']:
  85. task_vars = [i.strip() for i in self.project_info['task_vars'].split('\n') if i.strip() and not i.strip().startswith('#')]
  86. for var in task_vars:
  87. var_list = var.split('=', 1)
  88. if len(var_list) != 2:
  89. continue
  90. self.custom_global_env[var_list[0].strip()] = var_list[1].strip()
  91. self.localhost.init_env(env=self.custom_global_env)
  92. if project_id:
  93. self.project_id = project_id
  94. self.project_info = ProjectModel(id=project_id).item()
  95. self.servers = self.project_info['servers_info']
  96. self.project_name = self.project_info['id']
  97. self.dir_codebase_project = self.local_codebase + str(self.project_name)
  98. # self.init_repo()
  99. # start to deploy
  100. self.console = console
  101. def config(self, console=None):
  102. return {
  103. 'task_id': self.task_id,
  104. 'user_id': self.user_id,
  105. 'stage': self.stage,
  106. 'sequence': self.sequence,
  107. 'console': console if console is not None else self.console
  108. }
  109. def start(self):
  110. RecordModel().query.filter_by(task_id=self.task_id).delete()
  111. TaskModel().get_by_id(self.task_id).update({'status': TaskModel.status_doing})
  112. self.taskMdl = TaskModel().item(self.task_id)
  113. # ===================== fabric ================
  114. # SocketHandler
  115. def prev_deploy(self):
  116. '''
  117. # TODO
  118. socketio.sleep(0.001)
  119. 1.代码检出前要做的基础工作
  120. - 检查 当前用户
  121. - 检查 python 版本
  122. - 检查 git 版本
  123. - 检查 目录是否存在
  124. - 用户自定义命令
  125. :return:
  126. '''
  127. self.stage = self.stage_prev_deploy
  128. self.sequence = 1
  129. # 检查 目录是否存在
  130. self.init_repo()
  131. # 用户自定义命令
  132. commands = self.project_info['prev_deploy']
  133. if commands:
  134. for command in commands.split('\n'):
  135. if command.strip().startswith('#') or not command.strip():
  136. continue
  137. with self.localhost.cd(self.dir_codebase_project):
  138. result = self.localhost.local(command, wenv=self.config())
  139. def deploy(self):
  140. '''
  141. 2.检出代码
  142. :param project_name:
  143. :return:
  144. '''
  145. self.stage = self.stage_deploy
  146. self.sequence = 2
  147. #
  148. # # copy to a local version
  149. # self.release_version = '%s_%s_%s' % (
  150. # self.project_name, self.task_id, time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())))
  151. with self.localhost.cd(self.local_codebase):
  152. command = 'cp -rf %s %s' % (self.dir_codebase_project, self.release_version)
  153. current_app.logger.info('cd %s command: %s ', self.dir_codebase_project, command)
  154. result = self.localhost.local(command, wenv=self.config())
  155. # 更新到指定 branch/commit_id 或 tag
  156. repo = Repo(self.local_codebase + self.release_version)
  157. if self.project_info['repo_mode'] == ProjectModel.repo_mode_branch:
  158. repo.checkout_2_commit(branch=self.taskMdl['branch'], commit=self.taskMdl['commit_id'])
  159. else:
  160. repo.checkout_2_tag(tag=self.taskMdl['tag'])
  161. def post_deploy(self):
  162. '''
  163. 3.检出代码后要做的任务
  164. - 用户自定义操作命令
  165. - 代码编译
  166. - 清除日志文件及无用文件
  167. -
  168. - 压缩打包
  169. - 传送到版本库 release
  170. :return:
  171. '''
  172. self.stage = self.stage_post_deploy
  173. self.sequence = 3
  174. # 用户自定义命令
  175. commands = self.project_info['post_deploy']
  176. if commands:
  177. for command in commands.split('\n'):
  178. if command.strip().startswith('#') or not command.strip():
  179. continue
  180. with self.localhost.cd(self.local_codebase + self.release_version):
  181. result = self.localhost.local(command, wenv=self.config())
  182. # 压缩打包
  183. # 排除文件发布
  184. self.release_version_tar = '%s.tgz' % (self.release_version)
  185. with self.localhost.cd(self.local_codebase):
  186. if self.project_info['is_include']:
  187. files = includes_format(self.release_version, self.project_info['excludes'])
  188. else:
  189. files = excludes_format(self.release_version, self.project_info['excludes'])
  190. command = 'tar zcf %s/%s %s' % (self.local_codebase.rstrip('/'), self.release_version_tar, files)
  191. result = self.localhost.local(command, wenv=self.config())
  192. def prev_release(self, waller):
  193. '''
  194. 4.部署代码到目标机器前做的任务
  195. - 检查 webroot 父目录是否存在
  196. :return:
  197. '''
  198. self.stage = self.stage_prev_release
  199. self.sequence = 4
  200. # 检查 target_releases 父目录是否存在
  201. command = 'mkdir -p %s' % (self.project_info['target_releases'])
  202. result = waller.run(command, wenv=self.config())
  203. # TODO md5
  204. # 传送到版本库 release
  205. result = waller.put(self.local_codebase + self.release_version_tar,
  206. remote=self.project_info['target_releases'], wenv=self.config())
  207. current_app.logger.info('command: %s', dir(result))
  208. # 解压
  209. self.release_untar(waller)
  210. # 用户自定义命令
  211. self.prev_release_custom(waller)
  212. def prev_release_custom(self, waller):
  213. # 用户自定义命令
  214. commands = self.project_info['prev_release']
  215. if commands:
  216. for command in commands.split('\n'):
  217. if command.strip().startswith('#') or not command.strip():
  218. continue
  219. # TODO
  220. target_release_version = "%s/%s" % (self.project_info['target_releases'], self.release_version)
  221. with waller.cd(target_release_version):
  222. result = waller.run(command, wenv=self.config())
  223. def release(self, waller):
  224. '''
  225. 5.部署代码到目标机器做的任务
  226. - 打包代码 local
  227. - scp local => remote
  228. - 解压 remote
  229. :return:
  230. '''
  231. self.stage = self.stage_release
  232. self.sequence = 5
  233. with waller.cd(self.project_info['target_releases']):
  234. # 0. get previous link
  235. command = '[ -L %s ] && readlink %s || echo ""' % (self.project_info['target_root'], self.project_info['target_root'])
  236. result = waller.run(command, wenv=self.config(console=False))
  237. self.previous_release_version = os.path.basename(result.stdout).strip()
  238. # 1. create a tmp link dir
  239. current_link_tmp_dir = 'current-tmp-%s' % (self.task_id)
  240. command = 'ln -sfn {library}/{version} {library}/{current_tmp}'.format(
  241. library=self.project_info['target_releases'], version=self.release_version,
  242. current_tmp=current_link_tmp_dir)
  243. result = waller.run(command, wenv=self.config())
  244. # 2. make a soft link from release to tmp link
  245. # 3. move tmp link to webroot
  246. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  247. command = 'mv -fT %s %s' % (current_link_tmp_dir, self.project_info['target_root'])
  248. result = waller.run(command, wenv=self.config())
  249. def rollback(self, waller):
  250. '''
  251. 5.部署代码到目标机器做的任务
  252. - 恢复旧版本
  253. :return:
  254. '''
  255. self.stage = self.stage_release
  256. self.sequence = 5
  257. with waller.cd(self.project_info['target_releases']):
  258. # 0. get previous link
  259. command = '[ -L %s ] && readlink %s || echo ""' % (self.project_info['target_root'], self.project_info['target_root'])
  260. result = waller.run(command, wenv=self.config(console=False))
  261. self.previous_release_version = os.path.basename(result.stdout)
  262. # 1. create a tmp link dir
  263. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  264. command = 'ln -sfn %s/%s %s' % (
  265. self.project_info['target_releases'], self.release_version, current_link_tmp_dir)
  266. result = waller.run(command, wenv=self.config())
  267. # 2. make a soft link from release to tmp link
  268. # 3. move tmp link to webroot
  269. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  270. command = 'mv -fT %s %s' % (current_link_tmp_dir, self.project_info['target_root'])
  271. result = waller.run(command, wenv=self.config())
  272. def release_untar(self, waller):
  273. '''
  274. 解压版本包
  275. :return:
  276. '''
  277. with waller.cd(self.project_info['target_releases']):
  278. command = 'tar zxf %s' % (self.release_version_tar)
  279. result = waller.run(command, wenv=self.config())
  280. def post_release(self, waller):
  281. '''
  282. 6.部署代码到目标机器后要做的任务
  283. - 切换软链
  284. - 重启 nginx
  285. :return:
  286. '''
  287. self.stage = self.stage_post_release
  288. self.sequence = 6
  289. # 用户自定义命令
  290. commands = self.project_info['post_release']
  291. if commands:
  292. for command in commands.split('\n'):
  293. if command.strip().startswith('#') or not command.strip():
  294. continue
  295. # TODO
  296. with waller.cd(self.project_info['target_root']):
  297. pty = False if command.find('nohup') >= 0 else True
  298. result = waller.run(command, wenv=self.config(), pty=pty)
  299. # 个性化,用户重启的不一定是NGINX,可能是tomcat, apache, php-fpm等
  300. # self.post_release_service(waller)
  301. # 清理现场
  302. self.cleanup_remote(waller)
  303. def post_release_service(self, waller):
  304. '''
  305. 代码部署完成后,服务启动工作,如: nginx重启
  306. :param connection:
  307. :return:
  308. '''
  309. with waller.cd(self.project_info['target_root']):
  310. command = 'sudo service nginx restart'
  311. result = waller.run(command, wenv=self.config())
  312. def project_detection(self):
  313. errors = []
  314. # LOCAL_SERVER_USER => git
  315. # LOCAL_SERVER_USER => target_servers
  316. for server_info in self.servers:
  317. waller = Waller(host=server_info['host'], user=server_info['user'], port=server_info['port'])
  318. result = waller.run('id', exception=False, wenv=self.config())
  319. if result.failed:
  320. errors.append({
  321. 'title': '远程目标机器免密码登录失败',
  322. 'why': '远程目标机器:%s 错误:%s' % (server_info['host'], result.stdout),
  323. 'how': '在宿主机中配置免密码登录,把宿主机用户%s的~/.ssh/id_rsa.pub添加到远程目标机器用户%s的~/.ssh/authorized_keys。了解更多:http://walle-web.io/docs/troubleshooting.html' % (
  324. pwd.getpwuid(os.getuid())[0], server_info['host']),
  325. })
  326. # maybe this is no webroot's parent dir
  327. command = '[ -d {webroot} ] || mkdir -p {webroot}'.format(webroot=os.path.basename(self.project_info['target_root']))
  328. result = waller.run(command, exception=False, wenv=self.config(console=False))
  329. # 检查 webroot 父目录是否存在,是否为软链
  330. command = '[ -L "%s" ] && echo "true" || echo "false"' % (self.project_info['target_root'])
  331. result = waller.run(command, exception=False, wenv=self.config())
  332. if result.stdout == 'false':
  333. errors.append({
  334. 'title': '远程目标机器webroot不能是已建好的目录',
  335. 'why': '远程目标机器%s webroot不能是已存在的目录,必须为软链接,你不必新建,walle会自行创建。' % (server_info['host']),
  336. 'how': '手工删除远程目标机器:%s webroot目录:%s' % (server_info['host'], self.project_info['target_root']),
  337. })
  338. # remote release directory
  339. return errors
  340. def list_tag(self):
  341. repo = Repo(self.dir_codebase_project)
  342. repo.init(url=self.project_info['repo_url'])
  343. return repo.tags()
  344. def list_branch(self):
  345. repo = Repo(self.dir_codebase_project)
  346. repo.init(url=self.project_info['repo_url'])
  347. return repo.branches()
  348. def list_commit(self, branch):
  349. repo = Repo(self.dir_codebase_project)
  350. repo.init(url=self.project_info['repo_url'])
  351. return repo.commits(branch)
  352. def init_repo(self):
  353. repo = Repo(self.dir_codebase_project)
  354. repo.init(url=self.project_info['repo_url'])
  355. # @todo 没有做emit
  356. def cleanup_local(self):
  357. # clean local package
  358. command = 'rm -rf {project_id}_{task_id}_*'.format(project_id=self.project_info['id'], task_id=self.task_id)
  359. with self.localhost.cd(self.local_codebase):
  360. result = self.localhost.local(command, wenv=self.config())
  361. def cleanup_remote(self, waller):
  362. command = 'rm -rf {project_id}_{task_id}_*.tgz'.format(project_id=self.project_info['id'], task_id=self.task_id)
  363. with waller.cd(self.project_info['target_releases']):
  364. result = waller.run(command, wenv=self.config())
  365. command = 'ls -t | grep ^{project_id}_ | tail -n +{keep_version_num} | xargs rm -rf'.format(
  366. project_id=self.project_info['id'], keep_version_num=int(self.project_info['keep_version_num']) + 1)
  367. with waller.cd(self.project_info['target_releases']):
  368. result = waller.run(command, wenv=self.config())
  369. def logs(self):
  370. return RecordModel().fetch(task_id=self.task_id)
  371. def end(self, success=True, update_status=True):
  372. if update_status:
  373. status = TaskModel.status_success if success else TaskModel.status_fail
  374. current_app.logger.info('success:%s, status:%s' % (success, status))
  375. TaskModel().get_by_id(self.task_id).update({
  376. 'status': status,
  377. 'link_id': self.release_version,
  378. 'ex_link_id': self.previous_release_version,
  379. })
  380. notice_info = {
  381. 'title': '',
  382. 'username': current_user.username,
  383. 'project_name': self.project_info['name'],
  384. 'task_name': '%s ([%s](%s))' % (self.taskMdl.get('name'), self.task_id, Notice.task_url(project_name=self.project_info['name'], task_id=self.task_id)),
  385. 'branch': self.taskMdl.get('branch'),
  386. 'commit': self.taskMdl.get('commit_id'),
  387. 'tag': self.taskMdl.get('tag'),
  388. 'repo_mode': self.project_info['repo_mode'],
  389. }
  390. notice = Notice.create(self.project_info['notice_type'])
  391. if success:
  392. notice_info['title'] = '上线部署成功'
  393. notice.deploy_task(project_info=self.project_info, notice_info=notice_info)
  394. else:
  395. notice_info['title'] = '上线部署失败'
  396. notice.deploy_task(project_info=self.project_info, notice_info=notice_info)
  397. # 清理本地
  398. self.cleanup_local()
  399. if success:
  400. emit('success', {'event': 'finish', 'data': {'message': '部署完成,辛苦了,为你的努力喝彩!'}}, room=self.task_id)
  401. else:
  402. emit('fail', {'event': 'finish', 'data': {'message': Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  403. def walle_deploy(self):
  404. self.start()
  405. try:
  406. self.prev_deploy()
  407. self.deploy()
  408. self.post_deploy()
  409. is_all_servers_success = True
  410. for server_info in self.servers:
  411. host = server_info['host']
  412. try:
  413. waller = Waller(host=host, user=server_info['user'], port=server_info['port'], inline_ssh_env=True)
  414. waller.init_env(env=self.custom_global_env)
  415. self.connections[self.task_id] = waller
  416. self.prev_release(self.connections[self.task_id])
  417. self.release(self.connections[self.task_id])
  418. self.post_release(self.connections[self.task_id])
  419. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  420. task_id=self.task_id, status=RecordModel.status_success, host=host,
  421. user=server_info['user'], command='')
  422. emit('success', {'event': 'finish', 'data': {'host': host, 'message': host + ' 部署完成!'}}, room=self.task_id)
  423. except Exception as e:
  424. is_all_servers_success = False
  425. current_app.logger.exception(e)
  426. self.errors[host] = e.message
  427. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  428. task_id=self.task_id, status=RecordModel.status_fail, host=host,
  429. user=server_info['user'], command='')
  430. emit('fail', {'event': 'finish', 'data': {'host': host, 'message': host + Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  431. self.end(is_all_servers_success)
  432. except Exception as e:
  433. self.end(False)
  434. return {'success': self.success, 'errors': self.errors}
  435. def walle_rollback(self):
  436. self.start()
  437. try:
  438. is_all_servers_success = True
  439. for server_info in self.servers:
  440. host = server_info['host']
  441. try:
  442. waller = Waller(host=host, user=server_info['user'], port=server_info['port'], inline_ssh_env=True)
  443. waller.init_env(env=self.custom_global_env)
  444. self.connections[self.task_id] = waller
  445. self.prev_release_custom(self.connections[self.task_id])
  446. self.release(self.connections[self.task_id])
  447. self.post_release(self.connections[self.task_id])
  448. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  449. task_id=self.task_id, status=RecordModel.status_success, host=host,
  450. user=server_info['user'], command='')
  451. emit('success', {'event': 'finish', 'data': {'host': host, 'message': host + ' 部署完成!'}}, room=self.task_id)
  452. except Exception as e:
  453. is_all_servers_success = False
  454. current_app.logger.exception(e)
  455. self.errors[host] = e.message
  456. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  457. task_id=self.task_id, status=RecordModel.status_fail, host=host,
  458. user=server_info['user'], command='')
  459. emit('fail', {'event': 'finish', 'data': {'host': host, 'message': host + Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  460. self.end(is_all_servers_success)
  461. except Exception as e:
  462. self.end(False)
  463. return {'success': self.success, 'errors': self.errors}