deployer.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Author: wushuiyong
  4. # @Created Time : 日 1/ 1 23:43:12 2017
  5. # @Description:
  6. import time
  7. from datetime import datetime
  8. import os
  9. import pwd
  10. import re
  11. from flask import current_app
  12. from flask_socketio import emit
  13. from walle.model.project import ProjectModel
  14. from walle.model.record import RecordModel
  15. from walle.model.task import TaskModel
  16. from walle.service.code import Code
  17. from walle.service.error import WalleError
  18. from walle.service.utils import color_clean
  19. from walle.service.utils import excludes_format, includes_format
  20. from walle.service.notice import Notice
  21. from walle.service.waller import Waller
  22. from walle.service.git.repo import Repo
  23. from flask_login import current_user
  24. class Deployer:
  25. '''
  26. 序列号
  27. '''
  28. stage = 'init'
  29. sequence = 0
  30. stage_prev_deploy = 'prev_deploy'
  31. stage_deploy = 'deploy'
  32. stage_post_deploy = 'post_deploy'
  33. stage_prev_release = 'prev_release'
  34. stage_release = 'release'
  35. stage_post_release = 'post_release'
  36. task_id = '0'
  37. user_id = '0'
  38. taskMdl = None
  39. TaskRecord = None
  40. console = False
  41. custom_global_env = {}
  42. version = datetime.now().strftime('%Y%m%d%H%M%S')
  43. local_codebase, dir_codebase_project, project_name = None, None, None
  44. dir_release, dir_webroot = None, None
  45. connections, success, errors = {}, {}, {}
  46. release_version_tar, previous_release_version, release_version = None, None, None
  47. local = None
  48. def __init__(self, task_id=None, project_id=None, console=False):
  49. self.local_codebase = current_app.config.get('CODE_BASE').rstrip('/') + '/'
  50. self.localhost = Waller(host='127.0.0.1')
  51. self.TaskRecord = RecordModel()
  52. if task_id:
  53. self.task_id = task_id
  54. # task start
  55. current_app.logger.info(self.task_id)
  56. self.taskMdl = TaskModel().item(self.task_id)
  57. self.user_id = self.taskMdl.get('user_id')
  58. self.servers = self.taskMdl.get('servers_info')
  59. self.project_info = self.taskMdl.get('project_info')
  60. # copy to a local version
  61. self.release_version = '{project_id}_{task_id}_{timestamp}'.format(
  62. project_id=self.project_info['id'],
  63. task_id=self.task_id,
  64. timestamp=time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())),
  65. )
  66. current_app.logger.info(self.taskMdl)
  67. self.custom_global_env = {
  68. 'WEBROOT': str(self.project_info['target_root']),
  69. 'VERSION': str(self.release_version),
  70. 'CURRENT_RELEASE': str(self.project_info['target_releases']),
  71. 'BRANCH': str(self.taskMdl.get('branch')),
  72. 'TAG': str(self.taskMdl.get('tag')),
  73. 'COMMIT_ID': str(self.taskMdl.get('commit_id')),
  74. 'PROJECT_NAME': str(self.project_info['name']).replace('"', '').replace("'", '').replace(" ", '_'),
  75. 'PROJECT_ID': str(self.project_info['id']),
  76. 'TASK_NAME': str(self.taskMdl.get('name')).replace('"', '').replace("'", '').replace(" ", '_'),
  77. 'TASK_ID': str(self.task_id),
  78. 'DEPLOY_USER': str(self.taskMdl.get('user_name')),
  79. 'DEPLOY_TIME': str(time.strftime('%Y%m%d-%H:%M:%S', time.localtime(time.time()))),
  80. }
  81. if self.project_info['task_vars']:
  82. task_vars = [i.strip() for i in self.project_info['task_vars'].split('\n') if i.strip() and not i.strip().startswith('#')]
  83. for var in task_vars:
  84. var_list = var.split('=', 1)
  85. if len(var_list) != 2:
  86. continue
  87. self.custom_global_env[var_list[0]] = var_list[1]
  88. self.localhost.init_env(env=self.custom_global_env)
  89. if project_id:
  90. self.project_id = project_id
  91. self.project_info = ProjectModel(id=project_id).item()
  92. self.servers = self.project_info['servers_info']
  93. self.project_name = self.project_info['id']
  94. self.dir_codebase_project = self.local_codebase + str(self.project_name)
  95. # self.init_repo()
  96. # start to deploy
  97. self.console = console
  98. def config(self, console=None):
  99. return {
  100. 'task_id': self.task_id,
  101. 'user_id': self.user_id,
  102. 'stage': self.stage,
  103. 'sequence': self.sequence,
  104. 'console': console if console is not None else self.console
  105. }
  106. def start(self):
  107. RecordModel().query.filter_by(task_id=self.task_id).delete()
  108. TaskModel().get_by_id(self.task_id).update({'status': TaskModel.status_doing})
  109. self.taskMdl = TaskModel().item(self.task_id)
  110. # ===================== fabric ================
  111. # SocketHandler
  112. def prev_deploy(self):
  113. '''
  114. # TODO
  115. socketio.sleep(0.001)
  116. 1.代码检出前要做的基础工作
  117. - 检查 当前用户
  118. - 检查 python 版本
  119. - 检查 git 版本
  120. - 检查 目录是否存在
  121. - 用户自定义命令
  122. :return:
  123. '''
  124. self.stage = self.stage_prev_deploy
  125. self.sequence = 1
  126. # 检查 目录是否存在
  127. self.init_repo()
  128. # 用户自定义命令
  129. commands = self.project_info['prev_deploy']
  130. if commands:
  131. for command in commands.split('\n'):
  132. if command.strip().startswith('#') or not command.strip():
  133. continue
  134. with self.localhost.cd(self.dir_codebase_project):
  135. result = self.localhost.local(command, wenv=self.config())
  136. def deploy(self):
  137. '''
  138. 2.检出代码
  139. :param project_name:
  140. :return:
  141. '''
  142. self.stage = self.stage_deploy
  143. self.sequence = 2
  144. #
  145. # # copy to a local version
  146. # self.release_version = '%s_%s_%s' % (
  147. # self.project_name, self.task_id, time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())))
  148. with self.localhost.cd(self.local_codebase):
  149. command = 'cp -rf %s %s' % (self.dir_codebase_project, self.release_version)
  150. current_app.logger.info('cd %s command: %s ', self.dir_codebase_project, command)
  151. result = self.localhost.local(command, wenv=self.config())
  152. # 更新到指定 branch/commit_id 或 tag
  153. repo = Repo(self.local_codebase + self.release_version)
  154. if self.project_info['repo_mode'] == ProjectModel.repo_mode_branch:
  155. repo.checkout_2_commit(branch=self.taskMdl['branch'], commit=self.taskMdl['commit_id'])
  156. else:
  157. repo.checkout_2_tag(tag=self.taskMdl['tag'])
  158. def post_deploy(self):
  159. '''
  160. 3.检出代码后要做的任务
  161. - 用户自定义操作命令
  162. - 代码编译
  163. - 清除日志文件及无用文件
  164. -
  165. - 压缩打包
  166. - 传送到版本库 release
  167. :return:
  168. '''
  169. self.stage = self.stage_post_deploy
  170. self.sequence = 3
  171. # 用户自定义命令
  172. commands = self.project_info['post_deploy']
  173. if commands:
  174. for command in commands.split('\n'):
  175. if command.strip().startswith('#') or not command.strip():
  176. continue
  177. with self.localhost.cd(self.local_codebase + self.release_version):
  178. result = self.localhost.local(command, wenv=self.config())
  179. # 压缩打包
  180. # 排除文件发布
  181. self.release_version_tar = '%s.tgz' % (self.release_version)
  182. with self.localhost.cd(self.local_codebase):
  183. if self.project_info['is_include']:
  184. files = includes_format(self.release_version, self.project_info['excludes'])
  185. else:
  186. files = excludes_format(self.release_version, self.project_info['excludes'])
  187. command = 'tar zcf %s/%s %s' % (self.local_codebase.rstrip('/'), self.release_version_tar, files)
  188. result = self.localhost.local(command, wenv=self.config())
  189. def prev_release(self, waller):
  190. '''
  191. 4.部署代码到目标机器前做的任务
  192. - 检查 webroot 父目录是否存在
  193. :return:
  194. '''
  195. self.stage = self.stage_prev_release
  196. self.sequence = 4
  197. # 检查 target_releases 父目录是否存在
  198. command = 'mkdir -p %s' % (self.project_info['target_releases'])
  199. result = waller.run(command, wenv=self.config())
  200. # TODO md5
  201. # 传送到版本库 release
  202. result = waller.put(self.local_codebase + self.release_version_tar,
  203. remote=self.project_info['target_releases'], wenv=self.config())
  204. current_app.logger.info('command: %s', dir(result))
  205. # 解压
  206. self.release_untar(waller)
  207. # 用户自定义命令
  208. self.prev_release_custom(waller)
  209. def prev_release_custom(self, waller):
  210. # 用户自定义命令
  211. commands = self.project_info['prev_release']
  212. if commands:
  213. for command in commands.split('\n'):
  214. if command.strip().startswith('#') or not command.strip():
  215. continue
  216. # TODO
  217. target_release_version = "%s/%s" % (self.project_info['target_releases'], self.release_version)
  218. with waller.cd(target_release_version):
  219. result = waller.run(command, wenv=self.config())
  220. def release(self, waller):
  221. '''
  222. 5.部署代码到目标机器做的任务
  223. - 打包代码 local
  224. - scp local => remote
  225. - 解压 remote
  226. :return:
  227. '''
  228. self.stage = self.stage_release
  229. self.sequence = 5
  230. with waller.cd(self.project_info['target_releases']):
  231. # 0. get previous link
  232. command = '[ -L %s ] && readlink %s || echo ""' % (self.project_info['target_root'], self.project_info['target_root'])
  233. result = waller.run(command, wenv=self.config(console=False))
  234. self.previous_release_version = os.path.basename(result.stdout).strip()
  235. # 1. create a tmp link dir
  236. current_link_tmp_dir = 'current-tmp-%s' % (self.task_id)
  237. command = 'ln -sfn {library}/{version} {library}/{current_tmp}'.format(
  238. library=self.project_info['target_releases'], version=self.release_version,
  239. current_tmp=current_link_tmp_dir)
  240. result = waller.run(command, wenv=self.config())
  241. # 2. make a soft link from release to tmp link
  242. # 3. move tmp link to webroot
  243. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  244. command = 'mv -fT %s %s' % (current_link_tmp_dir, self.project_info['target_root'])
  245. result = waller.run(command, wenv=self.config())
  246. def rollback(self, waller):
  247. '''
  248. 5.部署代码到目标机器做的任务
  249. - 恢复旧版本
  250. :return:
  251. '''
  252. self.stage = self.stage_release
  253. self.sequence = 5
  254. with waller.cd(self.project_info['target_releases']):
  255. # 0. get previous link
  256. command = '[ -L %s ] && readlink %s || echo ""' % (self.project_info['target_root'], self.project_info['target_root'])
  257. result = waller.run(command, wenv=self.config(console=False))
  258. self.previous_release_version = os.path.basename(result.stdout)
  259. # 1. create a tmp link dir
  260. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  261. command = 'ln -sfn %s/%s %s' % (
  262. self.project_info['target_releases'], self.release_version, current_link_tmp_dir)
  263. result = waller.run(command, wenv=self.config())
  264. # 2. make a soft link from release to tmp link
  265. # 3. move tmp link to webroot
  266. current_link_tmp_dir = '%s/current-tmp-%s' % (self.project_info['target_releases'], self.task_id)
  267. command = 'mv -fT %s %s' % (current_link_tmp_dir, self.project_info['target_root'])
  268. result = waller.run(command, wenv=self.config())
  269. def release_untar(self, waller):
  270. '''
  271. 解压版本包
  272. :return:
  273. '''
  274. with waller.cd(self.project_info['target_releases']):
  275. command = 'tar zxf %s' % (self.release_version_tar)
  276. result = waller.run(command, wenv=self.config())
  277. def post_release(self, waller):
  278. '''
  279. 6.部署代码到目标机器后要做的任务
  280. - 切换软链
  281. - 重启 nginx
  282. :return:
  283. '''
  284. self.stage = self.stage_post_release
  285. self.sequence = 6
  286. # 用户自定义命令
  287. commands = self.project_info['post_release']
  288. if commands:
  289. for command in commands.split('\n'):
  290. if command.strip().startswith('#') or not command.strip():
  291. continue
  292. # TODO
  293. with waller.cd(self.project_info['target_root']):
  294. pty = False if command.find('nohup') >= 0 else True
  295. result = waller.run(command, wenv=self.config(), pty=pty)
  296. # 个性化,用户重启的不一定是NGINX,可能是tomcat, apache, php-fpm等
  297. # self.post_release_service(waller)
  298. # 清理现场
  299. self.cleanup_remote(waller)
  300. def post_release_service(self, waller):
  301. '''
  302. 代码部署完成后,服务启动工作,如: nginx重启
  303. :param connection:
  304. :return:
  305. '''
  306. with waller.cd(self.project_info['target_root']):
  307. command = 'sudo service nginx restart'
  308. result = waller.run(command, wenv=self.config())
  309. def project_detection(self):
  310. errors = []
  311. # LOCAL_SERVER_USER => git
  312. # LOCAL_SERVER_USER => target_servers
  313. for server_info in self.servers:
  314. waller = Waller(host=server_info['host'], user=server_info['user'], port=server_info['port'])
  315. result = waller.run('id', exception=False, wenv=self.config())
  316. if result.failed:
  317. errors.append({
  318. 'title': '远程目标机器免密码登录失败',
  319. 'why': '远程目标机器:%s 错误:%s' % (server_info['host'], result.stdout),
  320. 'how': '在宿主机中配置免密码登录,把宿主机用户%s的~/.ssh/ssh_rsa.pub添加到远程目标机器用户%s的~/.ssh/authorized_keys。了解更多:http://walle-web.io/docs/troubleshooting.html' % (
  321. pwd.getpwuid(os.getuid())[0], server_info['host']),
  322. })
  323. # maybe this is no webroot's parent dir
  324. command = '[ -d {webroot} ] || mkdir -p {webroot}'.format(webroot=os.path.basename(self.project_info['target_root']))
  325. result = waller.run(command, exception=False, wenv=self.config(console=False))
  326. # 检查 webroot 父目录是否存在,是否为软链
  327. command = '[ -L "%s" ] && echo "true" || echo "false"' % (self.project_info['target_root'])
  328. result = waller.run(command, exception=False, wenv=self.config())
  329. if result.stdout == 'false':
  330. errors.append({
  331. 'title': '远程目标机器webroot不能是已建好的目录',
  332. 'why': '远程目标机器%s webroot不能是已存在的目录,必须为软链接,你不必新建,walle会自行创建。' % (server_info['host']),
  333. 'how': '手工删除远程目标机器:%s webroot目录:%s' % (server_info['host'], self.project_info['target_root']),
  334. })
  335. # remote release directory
  336. return errors
  337. def list_tag(self):
  338. repo = Repo(self.dir_codebase_project)
  339. repo.init(url=self.project_info['repo_url'])
  340. return repo.tags()
  341. def list_branch(self):
  342. repo = Repo(self.dir_codebase_project)
  343. repo.init(url=self.project_info['repo_url'])
  344. return repo.branches()
  345. def list_commit(self, branch):
  346. repo = Repo(self.dir_codebase_project)
  347. repo.init(url=self.project_info['repo_url'])
  348. return repo.commits(branch)
  349. def init_repo(self):
  350. repo = Repo(self.dir_codebase_project)
  351. repo.init(url=self.project_info['repo_url'])
  352. # @todo 没有做emit
  353. def cleanup_local(self):
  354. # clean local package
  355. command = 'rm -rf {project_id}_{task_id}_*'.format(project_id=self.project_info['id'], task_id=self.task_id)
  356. with self.localhost.cd(self.local_codebase):
  357. result = self.localhost.local(command, wenv=self.config())
  358. def cleanup_remote(self, waller):
  359. command = 'rm -rf {project_id}_{task_id}_*.tgz'.format(project_id=self.project_info['id'], task_id=self.task_id)
  360. with waller.cd(self.project_info['target_releases']):
  361. result = waller.run(command, wenv=self.config())
  362. command = 'rm -rf `ls -t {project_id}_* | tail -n +{keep_version_num}`'.format(
  363. project_id=self.project_info['id'], keep_version_num=int(self.project_info['keep_version_num']) + 1)
  364. with waller.cd(self.project_info['target_releases']):
  365. result = waller.run(command, wenv=self.config())
  366. def logs(self):
  367. return RecordModel().fetch(task_id=self.task_id)
  368. def end(self, success=True, update_status=True):
  369. if update_status:
  370. status = TaskModel.status_success if success else TaskModel.status_fail
  371. current_app.logger.info('success:%s, status:%s' % (success, status))
  372. TaskModel().get_by_id(self.task_id).update({
  373. 'status': status,
  374. 'link_id': self.release_version,
  375. 'ex_link_id': self.previous_release_version,
  376. })
  377. notice_info = {
  378. 'title': '',
  379. 'username': current_user.username,
  380. 'project_name': self.project_info['name'],
  381. 'task_name': '%s ([%s](%s))' % (self.taskMdl.get('name'), self.task_id, Notice.task_url(project_name=self.project_info['name'], task_id=self.task_id)),
  382. 'branch': self.taskMdl.get('branch'),
  383. 'commit': self.taskMdl.get('commit_id'),
  384. 'tag': self.taskMdl.get('tag'),
  385. 'repo_mode': self.project_info['repo_mode'],
  386. }
  387. notice = Notice.create(self.project_info['notice_type'])
  388. if success:
  389. notice_info['title'] = '上线部署成功'
  390. notice.deploy_task(project_info=self.project_info, notice_info=notice_info)
  391. else:
  392. notice_info['title'] = '上线部署失败'
  393. notice.deploy_task(project_info=self.project_info, notice_info=notice_info)
  394. # 清理本地
  395. self.cleanup_local()
  396. if success:
  397. emit('success', {'event': 'finish', 'data': {'message': '部署完成,辛苦了,为你的努力喝彩!'}}, room=self.task_id)
  398. else:
  399. emit('fail', {'event': 'finish', 'data': {'message': Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  400. def walle_deploy(self):
  401. self.start()
  402. try:
  403. self.prev_deploy()
  404. self.deploy()
  405. self.post_deploy()
  406. is_all_servers_success = True
  407. for server_info in self.servers:
  408. host = server_info['host']
  409. try:
  410. waller = Waller(host=host, user=server_info['user'], port=server_info['port'], inline_ssh_env=True)
  411. waller.init_env(env=self.custom_global_env)
  412. self.connections[host] = waller
  413. self.prev_release(self.connections[host])
  414. self.release(self.connections[host])
  415. self.post_release(self.connections[host])
  416. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  417. task_id=self.task_id, status=RecordModel.status_success, host=host,
  418. user=server_info['user'], command='')
  419. emit('success', {'event': 'finish', 'data': {'host': host, 'message': host + ' 部署完成!'}}, room=self.task_id)
  420. except Exception as e:
  421. is_all_servers_success = False
  422. current_app.logger.exception(e)
  423. self.errors[host] = e.message
  424. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  425. task_id=self.task_id, status=RecordModel.status_fail, host=host,
  426. user=server_info['user'], command='')
  427. emit('fail', {'event': 'finish', 'data': {'host': host, 'message': host + Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  428. self.end(is_all_servers_success)
  429. except Exception as e:
  430. self.end(False)
  431. return {'success': self.success, 'errors': self.errors}
  432. def walle_rollback(self):
  433. self.start()
  434. try:
  435. is_all_servers_success = True
  436. self.release_version = self.taskMdl.get('link_id')
  437. for server_info in self.servers:
  438. host = server_info['host']
  439. try:
  440. waller = Waller(host=host, user=server_info['user'], port=server_info['port'], inline_ssh_env=True)
  441. waller.init_env(env=self.custom_global_env)
  442. self.connections[host] = waller
  443. self.prev_release_custom(self.connections[host])
  444. self.release(self.connections[host])
  445. self.post_release(self.connections[host])
  446. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  447. task_id=self.task_id, status=RecordModel.status_success, host=host,
  448. user=server_info['user'], command='')
  449. emit('success', {'event': 'finish', 'data': {'host': host, 'message': host + ' 部署完成!'}}, room=self.task_id)
  450. except Exception as e:
  451. is_all_servers_success = False
  452. current_app.logger.exception(e)
  453. self.errors[host] = e.message
  454. RecordModel().save_record(stage=RecordModel.stage_end, sequence=0, user_id=current_user.id,
  455. task_id=self.task_id, status=RecordModel.status_fail, host=host,
  456. user=server_info['user'], command='')
  457. emit('fail', {'event': 'finish', 'data': {'host': host, 'message': host + Code.code_msg[Code.deploy_fail]}}, room=self.task_id)
  458. self.end(is_all_servers_success)
  459. except Exception as e:
  460. self.end(False)
  461. return {'success': self.success, 'errors': self.errors}