Java Websocket发送文件给Vue客户端接收并上传,实现检测U盘插入并将指定文件上传到服务器功能

服务器 0

应用环境:

B/S架构

需求描述:

1、判断U盘接入

2、扫描U盘指定文件,将满足条件的文件发送给服务器

解决思路:

1、因为bs架构,无法获取本机资源,计划在U盘所在服务器部署websocket服务

2、websocket服务扫描u盘,拿到指定文件,使用session.getBasicRemote().sendBinary(data)分批发送二进制流到客户端

3、web端的收到二进制流后将分批数据存到数组

4、数据接收全部完毕后,通过formData将数据提交到服务端进行保存,

5、当时想到websocket直接将文件传给后端服务器,但只想websocket服务只是作与web端的数据传输,具体文件还是由web端与后端服务器进行交互。

定时任务,检查U盘插入找到指定文件,并将文件的二进制流发给客户端

import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.sims.tools.webSocket.WebSocketServer;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.io.IOException;import java.nio.file.FileSystems;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.util.ArrayList;import java.util.List;@EnableScheduling//开启定时任务@Componentpublic class CheckTask {    private static boolean findFile = false;    private static boolean hasUsb = false;    private static String usbPath = "";    private static List<String> roots = new ArrayList<>();    @Scheduled(cron = "0/5 * * * * ? ")    public void cron()  {        try {            System.out.println("定时任务是否执行:" + !findFile+"--根目录文件夹数"+roots.size());            if(findFile){                //文件已经导入了                System.out.println("文件已经导入了,如果想再导入,拨掉u盘再插入");                List<String> newRoots = new ArrayList<>();                FileSystems.getDefault().getRootDirectories().forEach(root -> {                    newRoots.add(root.toString());                });                if(newRoots.size()<roots.size()){                    //新目录少于现有目录,更新                    findFile = false;                    roots = newRoots;                    usbPath = "";                    System.out.println("U盘被拔出");                }            } else {                //文件未导入,继续检测                System.out.println("文件未导入,继续检测");                if (roots.size() == 0) {                    // 获取系统中的所有文件系统                    FileSystems.getDefault().getRootDirectories().forEach(root -> {                        roots.add(root.toString());                    });                } else {                    List<String> newRoots = new ArrayList<>();                    // 获取系统中的所有文件系统                    FileSystems.getDefault().getRootDirectories().forEach(root -> {                        if ((roots.indexOf(root.toString()) < 0) || (root.toString().equals(usbPath))) {                            System.out.println("U盘已插入,路径为:" + root);                            hasUsb = true;                            usbPath = root.toString();                            try {                                Files.walk(root).forEach(path -> {                                    if (Files.isRegularFile(path)) {                                        if (path.toString().endsWith("erc")) {                                            System.out.println("文件:" + path);                                            JSONObject obj1 = new JSONObject();                                            obj1.put("msgType","sendFileBegin");                                            WebSocketServer.sendInfo(JSON.toJSONString(obj1), "user0");                                            WebSocketServer.sendFile(path.toString(),"user0");                                            JSONObject obj2 = new JSONObject();                                            obj2.put("msgType","sendFileEnd");                                            obj2.put("fileName",path.toString());                                            WebSocketServer.sendInfo(JSON.toJSONString(obj2), "user0");                                            findFile = true;                                        }                                    }                                });                            } catch (IOException e) {                                System.out.println("io错误:" + e.getMessage());                                findFile = false;                            }                        }                        newRoots.add(root.toString());                    });                    if(newRoots.size()< roots.size()){                        //U盘被拔出                        System.out.println("U盘被拔出");                        usbPath = "";                        findFile = false;                    }                    roots = newRoots;                }            }        } catch (Exception e) {            System.out.println("报错了" + e.getMessage());            findFile = false;        }    }}

Websocket服务代码

import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.net.InetAddress;import java.net.NetworkInterface;import java.net.SocketException;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.concurrent.ConcurrentHashMap;/** * websocket的处理类。 * 作用相当于HTTP请求 * 中的controller */@Component@Slf4j@ServerEndpoint("/websocket/{userId}")public class WebSocketServer {    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/    private static int onlineCount = 0;    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/    private Session session;    /**接收userId*/    private String userId = "";    public String getUserId(){        return this.userId;    }    /**     * 连接建立成     * 功调用的方法     */    @OnOpen    public void onOpen(Session session,@PathParam("userId") String userId) {        this.session = session;        this.userId=userId;        if(webSocketMap.containsKey(userId)){            webSocketMap.remove(userId);            //加入set中            webSocketMap.put(userId,this);        }else{            //加入set中            webSocketMap.put(userId,this);            //在线数加1            addOnlineCount();        }        System.out.println("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());        try {            InetAddress ip = InetAddress.getLocalHost();            System.out.println("Current IP address : " + ip.getHostAddress());            NetworkInterface network = NetworkInterface.getByInetAddress(ip);            byte[] mac = network.getHardwareAddress();            System.out.print("Current MAC address : ");            StringBuilder sb = new StringBuilder();            for (int i = 0; i < mac.length; i++) {                sb.append(String.format("%02X%s", mac[i], (i < mac.length - 1) ? "-" : ""));            }            System.out.println(sb);            JSONObject object = new JSONObject();            object.put("ip",ip.getHostAddress());            object.put("mac",sb);            JSONObject obj = new JSONObject();            obj.put("msgType","ipAndMac");            obj.put("result",object);            sendInfo(JSON.toJSONString(obj),this.userId);        } catch (UnknownHostException e) {            e.printStackTrace();        } catch (SocketException e){            e.printStackTrace();        }    }    /**     * 连接关闭     * 调用的方法     */    @OnClose    public void onClose() {        if(webSocketMap.containsKey(userId)){            webSocketMap.remove(userId);            //从set中删除            subOnlineCount();        }        System.out.println("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());    }    /**     * 收到客户端消     * 息后调用的方法     * @param message     * 客户端发送过来的消息     **/    @OnMessage    public void onMessage(String message, Session session) {        System.out.println("用户消息:"+userId+",报文:"+message);        //可以群发消息        //消息保存到数据库、redis        if(message!=null && message.length()>0){            try {                //解析发送的报文                JSONObject jsonObject = JSON.parseObject(message);                //追加发送人(防止串改)                jsonObject.put("fromUserId",this.userId);                String toUserId=jsonObject.getString("toUserId");                //传送给对应toUserId用户的websocket                if(toUserId!=null && toUserId.length()>0 &&webSocketMap.containsKey(toUserId)){                    webSocketMap.get(toUserId).sendMessage(message);                }else{                    //否则不在这个服务器上,发送到mysql或者redis                    System.out.println("请求的userId:"+toUserId+"不在该服务器上");                }            }catch (Exception e){                e.printStackTrace();            }        }    }    /**     * @param session     * @param error     */    @OnError    public void onError(Session session, Throwable error) {        System.out.println("用户错误:"+this.userId+",原因:"+error.getMessage());        error.printStackTrace();    }    /**     * 实现服务     * 器主动推送     */    public void sendMessage(String message) {        try {            this.session.getBasicRemote().sendText(message);        } catch (IOException e) {            e.printStackTrace();        }    }    public void sendByteBuffer(ByteBuffer data) {        System.out.print(data);        try {            session.getBasicRemote().sendBinary(data);        } catch (IOException e) {            e.printStackTrace();        }    }    /**     *发送自定     *义消息     **/    public static void sendInfo(String message, String userId) {        System.out.println("发送消息到:"+userId+",报文:"+message);        if(userId!=null && userId.length()>0 && webSocketMap.containsKey(userId)){            webSocketMap.get(userId).sendMessage(message);        }else{            System.out.println("用户"+userId+",不在线!");        }    }    public static void sendFile(String  path, String userId) {        if(userId!=null && userId.length()>0 && webSocketMap.containsKey(userId)){            try {                File file = new File(path);                FileInputStream fis = new FileInputStream(file);                byte[] buffer = new byte[1024];                int bytesRead;                while ((bytesRead = fis.read(buffer)) != -1) {                    ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);                    webSocketMap.get(userId).sendByteBuffer(byteBuffer);                }                fis.close();            } catch (Exception e) {                e.printStackTrace();            }        }else{            System.out.println("用户"+userId+",不在线!");        }    }    /**     *发送自定     *义消息     **/    public void sendInfoToAll(String message) {        System.out.println("发送报文:"+message);        sendMessage(message);    }    /**     * 获得此时的     * 在线人数     * @return     */    public static synchronized int getOnlineCount() {        return onlineCount;    }    /**     * 在线人     * 数加1     */    public static synchronized void addOnlineCount() {        WebSocketServer.onlineCount++;    }    /**     * 在线人     * 数减1     */    public static synchronized void subOnlineCount() {        WebSocketServer.onlineCount--;    }}

Vue前端代码

websocketOnmessage: function (e){      let data = e.data      let that = this;      console.log("rec====", data);      if (this.isBlob(data)) {        console.log("recdata===", data)        this.recData.push(data);      } else {        let record = JSON.parse(data)        if (record.msgType == "ipAndMac") {          if(!sessionStorage.getItem('ipAndMac')){            let result = record.result;            loginLog(result).then(res=>{              console.log('res',res)              if(res.data.success){                sessionStorage.setItem('ipAndMac',result)              }                      })          }        } else if (record.msgType == "sendFileBegin")  {          //开始接收服务端发送过来的文件数据          this.recData = [];        } else if (record.msgType == "sendFileEnd") {          //文件数据的接收完毕,合并数据并上传到业务服务器          if (this.recData.length == 0) return;          this.$showMsgBox({            caption:"询问",            msg: '检查到待导入文件' + record.fileName + ',是否导入?',            callback:(data) => {              if (data == "yes") {                var formData = new FormData()                formData.append('file', new Blob(this.recData))                formData.append('fileName', record.fileName);                      let url = config.configData.api_url  + "/business/biz/importAllByPath";                utils.httpFile(url,formData).then((res) => {                    if (res.data.success == true) {                           that.$showImport({tableData:res.data.data})                    } else {                        this.$showToast({msg: res.data.message});                        return;                        }                })              }            }          })        }      }          },

特别注意这个写法不要搞错,formData.append('file', new Blob(this.recData)),否则后端接受不到正确的格式数据。

服务端接收前端上传数据代码

@RequestMapping(value = "/importAllByPath", method = RequestMethod.POST)    public Result<?> importAllByPath(HttpServletRequest request, HttpServletResponse response) {        String fileName = request.getParameter("fileName");        MultipartHttpServletRequest multipartRequest = (MultipartHttpServletRequest) request;        Map<String, MultipartFile> fileMap = multipartRequest.getFileMap();        for (Map.Entry<String, MultipartFile> entity : fileMap.entrySet()) {            MultipartFile file = entity.getValue();// 获取上传文件对象            try {                String uuid = StrUtils.getTimeNo("sims_data_");                String path = uploadpath + File.separator + "temp" + File.separator + uuid + File.separator;                File dir = new File(path);                if (!dir.exists()) {                    dir.mkdirs();                }                String sFile = new File(path).getAbsolutePath() + File.separator + uuid + ".erc";                try (FileOutputStream outputStream = new FileOutputStream(sFile)) {                    byte[] bytes = file.getBytes();                    outputStream.write(bytes);                } catch (Exception e) {                    return Result.error("文件导入失败:" + e.getMessage());                }                return Result.ok(result);            } catch (Exception e) {                return Result.error("文件导入失败:" + e.getMessage());            }        }        return Result.ok("导入失败");    }

也许您对下面的内容还感兴趣: