Selaa lähdekoodia

websocket add todo

meolu 6 vuotta sitten
vanhempi
commit
c36bd16a71
3 muutettua tiedostoa jossa 47 lisäystä ja 164 poistoa
  1. 4 5
      walle/app.py
  2. 22 131
      walle/service/deployer.py
  3. 21 28
      walle/service/waller.py

+ 4 - 5
walle/app.py

@@ -7,7 +7,7 @@ import sys
 from flask import Flask, render_template, current_app
 from flask_login import current_user
 from flask_restful import Api
-from flask_socketio import emit, join_room
+from flask_socketio import emit, join_room, send
 from walle import commands
 from walle.api import access as AccessAPI
 from walle.api import api as BaseAPI
@@ -210,15 +210,14 @@ def register_socketio(app):
     def deploy(message):
         task = message['task']
         emit('console', {'event': 'task:console', 'data': {}}, room=task)
-        from walle.service.deployer import DeploySocketIO
-        wi = DeploySocketIO(12)
-        ret = wi.deploy()
+        from walle.service.deployer import Deployer
+        wi = Deployer(task_id=task)
+        ret = wi.walle_deploy()
 
     @socketio.on('logs', namespace=namespace)
     def logs(message):
         current_app.logger.info(message)
         task = message['task']
-        emit('console', {'event': 'task:console', 'data': {'logs':'logs'}}, room=task)
         walle_socket = WalleSocketIO(room=task)
         walle_socket.logs()
 

+ 22 - 131
walle/service/deployer.py

@@ -16,125 +16,7 @@ from walle.model import deploy as TaskModel
 from walle.service.waller import Waller
 from walle.model.deploy import ProjectModel
 from flask_socketio import emit
-
-# import fabric2.exceptions.GroupException
-
-class DeploySocketIO:
-
-    '''
-    序列号
-    '''
-    stage = '0'
-
-    sequence = 0
-    stage_prev_deploy = 'prev_deploy'
-    stage_deploy = 'deploy'
-    stage_post_deploy = 'post_deploy'
-
-    stage_prev_release = 'prev_release'
-    stage_release = 'release'
-    stage_post_release = 'post_release'
-
-    task_id = '0'
-    user_id = '0'
-    taskMdl = None
-    TaskRecord = None
-
-    version = datetime.now().strftime('%Y%m%d%H%M%s')
-    project_name = 'walden'
-    dir_codebase = '/tmp/walle/codebase/'
-    dir_codebase_project = dir_codebase + project_name
-
-    # 定义远程机器
-    # env.hosts = ['172.16.0.231', '172.16.0.177']
-
-    dir_release = None
-    dir_webroot = None
-
-    connections, success, errors = {}, {}, {}
-    release_version_tar, release_version = None, None
-    local, websocket = None, None
-    def __init__(self, task_id=None, project_id=None, websocket=None):
-        self.local = Waller(host=current_app.config.get('LOCAL_SERVER_HOST'),
-                            user=current_app.config.get('LOCAL_SERVER_USER'),
-                            port=current_app.config.get('LOCAL_SERVER_PORT'))
-        self.TaskRecord = TaskModel.TaskRecordModel()
-        current_app.logger.info('DeploySocketIO.__init__ before')
-        emit('console', {'event': 'pusher:console', 'data': {'msg': 'init task'}})
-        current_app.logger.info('DeploySocketIO.__init__ after')
-        # if websocket:
-        #     websocket.send_updates(__name__)
-        #     self.websocket = websocket
-        if task_id:
-            self.task_id = task_id
-            self.taskMdl = TaskModel.TaskModel().item(self.task_id)
-            self.user_id = self.taskMdl.get('user_id')
-            self.servers = self.taskMdl.get('servers_info')
-            self.task = self.taskMdl.get('target_user')
-            self.project_info = self.taskMdl.get('project_info')
-        if project_id:
-            self.project_id = project_id
-            self.project_info = ProjectModel(id=project_id).item()
-
-    def config(self):
-        return {'task_id': self.task_id, 'user_id': self.user_id, 'stage': self.stage, 'sequence': self.sequence,
-                'websocket': self.websocket}
-
-    # ===================== fabric ================
-    # SocketHandler
-    def deploy(self):
-        '''
-        1.代码检出前要做的基础工作
-        - 检查 当前用户
-        - 检查 python 版本
-        - 检查 git 版本
-        - 检查 目录是否存在
-        - 用户自定义命令
-
-        :return:
-        '''
-        self.stage = self.stage_prev_deploy
-        self.sequence = 1
-
-        # TODO remove
-        # result = self.local.run('sleep 30', wenv=self.config())
-
-        # 检查 当前用户
-        command = 'whoami'
-        current_app.logger.info(command)
-        emit('console', {'event': 'task:console', 'data': {'commad':  command}}, room=self.task_id)
-
-        result = self.local.run(command, wenv=self.config())
-
-        # 检查 python 版本
-        command = 'python --version'
-        result = self.local.run(command, wenv=self.config())
-        current_app.logger.info(command)
-
-        # 检查 git 版本
-        command = 'git --version'
-        result = self.local.run(command, wenv=self.config())
-        current_app.logger.info(command)
-
-        # 检查 目录是否存在
-        command = 'mkdir -p %s' % (self.dir_codebase_project)
-        # TODO remove
-        current_app.logger.info(command)
-        result = self.local.run(command, wenv=self.config())
-
-        # 用户自定义命令
-        command = self.project_info['prev_deploy']
-        current_app.logger.info(command)
-        with self.local.cd(self.dir_codebase_project):
-            result = self.local.run(command, wenv=self.config())
-
-            # SocketHandler.send_to_all({
-            #     'type': 'user',
-            #     'id': 33,
-            #     'host': env.host_string,
-            #     'command': command,
-            #     'message': result.stdout,
-            # })
+from walle.service.extensions import socketio
 
 
 class Deployer:
@@ -171,16 +53,14 @@ class Deployer:
 
     connections, success, errors = {}, {}, {}
     release_version_tar, release_version = None, None
-    local, websocket = None, None
+    local = None
 
-    def __init__(self, task_id=None, project_id=None, websocket=None):
+    def __init__(self, task_id=None, project_id=None):
         self.local = Waller(host=current_app.config.get('LOCAL_SERVER_HOST'),
                             user=current_app.config.get('LOCAL_SERVER_USER'),
                             port=current_app.config.get('LOCAL_SERVER_PORT'))
         self.TaskRecord = TaskModel.TaskRecordModel()
-        if websocket:
-            websocket.send_updates(__name__)
-            self.websocket = websocket
+
         if task_id:
             self.task_id = task_id
             self.taskMdl = TaskModel.TaskModel().item(self.task_id)
@@ -193,13 +73,14 @@ class Deployer:
             self.project_info = ProjectModel(id=project_id).item()
 
     def config(self):
-        return {'task_id': self.task_id, 'user_id': self.user_id, 'stage': self.stage, 'sequence': self.sequence,
-                'websocket': self.websocket}
+        return {'task_id': self.task_id, 'user_id': self.user_id, 'stage': self.stage, 'sequence': self.sequence}
 
     # ===================== fabric ================
     # SocketHandler
     def prev_deploy(self):
         '''
+        # TODO
+        socketio.sleep(0.001)
         1.代码检出前要做的基础工作
         - 检查 当前用户
         - 检查 python 版本
@@ -217,9 +98,8 @@ class Deployer:
 
         # 检查 当前用户
         command = 'whoami'
-        self.websocket.send_updates(command)
         current_app.logger.info(command)
-
+        socketio.emit('console', {'event': 'task:console', 'data': {'cmd':command}}, room=self.task_id, ignore_queue=True)
         result = self.local.run(command, wenv=self.config())
 
         # 检查 python 版本
@@ -232,6 +112,7 @@ class Deployer:
         result = self.local.run(command, wenv=self.config())
         current_app.logger.info(command)
 
+        time.sleep(3)
         # 检查 目录是否存在
         command = 'mkdir -p %s' % (self.dir_codebase_project)
         # TODO remove
@@ -259,6 +140,8 @@ class Deployer:
         :param project_name:
         :return:
         '''
+        # TODO
+        socketio.sleep(0.001)
         self.stage = self.stage_deploy
         self.sequence = 2
 
@@ -321,12 +204,16 @@ class Deployer:
         - 传送到版本库 release
         :return:
         '''
+        # TODO
+        socketio.sleep(0.001)
         self.stage = self.stage_post_deploy
         self.sequence = 3
 
         # 用户自定义命令
         command = self.project_info['post_deploy']
         current_app.logger.info(command)
+        current_app.logger.info(self.dir_codebase)
+        current_app.logger.info(self.release_version)
         with self.local.cd(self.dir_codebase + self.release_version):
             result = self.local.run(command, wenv=self.config())
 
@@ -342,6 +229,8 @@ class Deployer:
         - 检查 webroot 父目录是否存在
         :return:
         '''
+        # TODO
+        socketio.sleep(0.001)
         self.stage = self.stage_prev_release
         self.sequence = 4
 
@@ -359,7 +248,7 @@ class Deployer:
         # 传送到版本库 release
         current_app.logger.info('/tmp/walle/codebase/' + self.release_version_tar)
         result = waller.put('/tmp/walle/codebase/' + self.release_version_tar,
-                            remote=self.project_info['target_releases'])
+                            remote=self.project_info['target_releases'], wenv=self.config())
         current_app.logger.info('command: %s', dir(result))
 
         # 解压
@@ -373,6 +262,8 @@ class Deployer:
         - 解压 remote
         :return:
         '''
+        # TODO
+        socketio.sleep(0.001)
         self.stage = self.stage_release
         self.sequence = 5
 
@@ -395,6 +286,8 @@ class Deployer:
         解压版本包
         :return:
         '''
+        # TODO
+        socketio.sleep(0.001)
         with waller.cd(self.project_info['target_releases']):
             command = 'tar zxf %s' % (self.release_version_tar)
             result = waller.run(command, wenv=self.config())
@@ -474,8 +367,6 @@ class Deployer:
         return None
 
     def walle_deploy(self):
-
-
         self.prev_deploy()
         self.deploy()
         self.post_deploy()

+ 21 - 28
walle/service/waller.py

@@ -53,7 +53,6 @@ class Waller(Connection):
             }
             emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])
 
-                # wenv['websocket'].send_updates(ws_dict)
             TaskRecordModel().save_record(stage=wenv['stage'], sequence=wenv['sequence'], user_id=wenv['user_id'],
                                           task_id=wenv['task_id'], status=result.exited, host=self.host, user=self.user,
                                           command=result.command,success=result.stdout.strip(), error=result.stderr.strip())
@@ -124,36 +123,30 @@ class Waller(Connection):
             current_app.logger.info(message)
 
             # TODO
-            if wenv.has_key('websocket') and wenv['websocket']:
-
-                ws_dict = {
-                    'host': self.host,
-                    'cmd': command,
-                    'status': 1,
-                    'stage': wenv['stage'],
-                    'sequence': wenv['sequence'],
-                    'success': '',
-                    'error': result.stderr.strip(),
-                }
-                # wenv['websocket'].send_updates(ws_dict)
+            ws_dict = {
+                'host': self.host,
+                'cmd': command,
+                'status': 1,
+                'stage': wenv['stage'],
+                'sequence': wenv['sequence'],
+                'success': '',
+                'error': result.stderr.strip(),
+            }
+            emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])
 
             return result
         except Exception, e:
             # TODO 收尾下
             current_app.logger.info('put: %s, %s', e, dir(e))
 
-
-            # TODO
-            if wenv.has_key('websocket') and wenv['websocket']:
-
-                # TODO command
-                ws_dict = {
-                    'host': self.host,
-                    'cmd': 'command',
-                    'status': 1,
-                    'stage': wenv['stage'],
-                    'sequence': wenv['sequence'],
-                    'success': '',
-                    'error': e.message,
-                }
-                # wenv['websocket'].send_updates(ws_dict)
+            # TODO command
+            ws_dict = {
+                'host': self.host,
+                'cmd': 'command',
+                'status': 1,
+                'stage': wenv['stage'],
+                'sequence': wenv['sequence'],
+                'success': '',
+                'error': e.message,
+            }
+            emit('console', {'event': 'task:console', 'data': ws_dict}, room=wenv['task_id'])