应用环境:
B/S架构
需求描述:
1、判断U盘接入
2、扫描U盘指定文件,将满足条件的文件发送给服务器
解决思路:
1、因为bs架构,无法获取本机资源,计划在U盘所在服务器部署websocket服务
2、websocket服务扫描u盘,拿到指定文件,使用session.getBasicRemote().sendBinary(data)分批发送二进制流到客户端
3、web端的收到二进制流后将分批数据存到数组
4、数据接收全部完毕后,通过formData将数据提交到服务端进行保存,
5、当时想到websocket直接将文件传给后端服务器,但只想websocket服务只是作与web端的数据传输,具体文件还是由web端与后端服务器进行交互。
定时任务,检查U盘插入找到指定文件,并将文件的二进制流发给客户端
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.sims.tools.webSocket.WebSocketServer;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.io.IOException;import java.nio.file.FileSystems;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.util.ArrayList;import java.util.List;@EnableScheduling//开启定时任务@Componentpublic class CheckTask { private static boolean findFile = false; private static boolean hasUsb = false; private static String usbPath = ""; private static List<String> roots = new ArrayList<>(); @Scheduled(cron = "0/5 * * * * ? ") public void cron() { try { System.out.println("定时任务是否执行:" + !findFile+"--根目录文件夹数"+roots.size()); if(findFile){ //文件已经导入了 System.out.println("文件已经导入了,如果想再导入,拨掉u盘再插入"); List<String> newRoots = new ArrayList<>(); FileSystems.getDefault().getRootDirectories().forEach(root -> { newRoots.add(root.toString()); }); if(newRoots.size()<roots.size()){ //新目录少于现有目录,更新 findFile = false; roots = newRoots; usbPath = ""; System.out.println("U盘被拔出"); } } else { //文件未导入,继续检测 System.out.println("文件未导入,继续检测"); if (roots.size() == 0) { // 获取系统中的所有文件系统 FileSystems.getDefault().getRootDirectories().forEach(root -> { roots.add(root.toString()); }); } else { List<String> newRoots = new ArrayList<>(); // 获取系统中的所有文件系统 FileSystems.getDefault().getRootDirectories().forEach(root -> { if ((roots.indexOf(root.toString()) < 0) || (root.toString().equals(usbPath))) { System.out.println("U盘已插入,路径为:" + root); hasUsb = true; usbPath = root.toString(); try { Files.walk(root).forEach(path -> { if (Files.isRegularFile(path)) { if (path.toString().endsWith("erc")) { System.out.println("文件:" + path); JSONObject obj1 = new JSONObject(); obj1.put("msgType","sendFileBegin"); WebSocketServer.sendInfo(JSON.toJSONString(obj1), "user0"); WebSocketServer.sendFile(path.toString(),"user0"); JSONObject obj2 = new JSONObject(); obj2.put("msgType","sendFileEnd"); obj2.put("fileName",path.toString()); WebSocketServer.sendInfo(JSON.toJSONString(obj2), "user0"); findFile = true; } } }); } catch (IOException e) { System.out.println("io错误:" + e.getMessage()); findFile = false; } } newRoots.add(root.toString()); }); if(newRoots.size()< roots.size()){ //U盘被拔出 System.out.println("U盘被拔出"); usbPath = ""; findFile = false; } roots = newRoots; } } } catch (Exception e) { System.out.println("报错了" + e.getMessage()); findFile = false; } }}
Websocket服务代码
import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.net.InetAddress;import java.net.NetworkInterface;import java.net.SocketException;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.concurrent.ConcurrentHashMap;/** * websocket的处理类。 * 作用相当于HTTP请求 * 中的controller */@Component@Slf4j@ServerEndpoint("/websocket/{userId}")public class WebSocketServer { /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/ private static int onlineCount = 0; /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/ private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/ private Session session; /**接收userId*/ private String userId = ""; public String getUserId(){ return this.userId; } /** * 连接建立成 * 功调用的方法 */ @OnOpen public void onOpen(Session session,@PathParam("userId") String userId) { this.session = session; this.userId=userId; if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //加入set中 webSocketMap.put(userId,this); }else{ //加入set中 webSocketMap.put(userId,this); //在线数加1 addOnlineCount(); } System.out.println("用户连接:"+userId+",当前在线人数为:" + getOnlineCount()); try { InetAddress ip = InetAddress.getLocalHost(); System.out.println("Current IP address : " + ip.getHostAddress()); NetworkInterface network = NetworkInterface.getByInetAddress(ip); byte[] mac = network.getHardwareAddress(); System.out.print("Current MAC address : "); StringBuilder sb = new StringBuilder(); for (int i = 0; i < mac.length; i++) { sb.append(String.format("%02X%s", mac[i], (i < mac.length - 1) ? "-" : "")); } System.out.println(sb); JSONObject object = new JSONObject(); object.put("ip",ip.getHostAddress()); object.put("mac",sb); JSONObject obj = new JSONObject(); obj.put("msgType","ipAndMac"); obj.put("result",object); sendInfo(JSON.toJSONString(obj),this.userId); } catch (UnknownHostException e) { e.printStackTrace(); } catch (SocketException e){ e.printStackTrace(); } } /** * 连接关闭 * 调用的方法 */ @OnClose public void onClose() { if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } System.out.println("用户退出:"+userId+",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消 * 息后调用的方法 * @param message * 客户端发送过来的消息 **/ @OnMessage public void onMessage(String message, Session session) { System.out.println("用户消息:"+userId+",报文:"+message); //可以群发消息 //消息保存到数据库、redis if(message!=null && message.length()>0){ try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId",this.userId); String toUserId=jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if(toUserId!=null && toUserId.length()>0 &&webSocketMap.containsKey(toUserId)){ webSocketMap.get(toUserId).sendMessage(message); }else{ //否则不在这个服务器上,发送到mysql或者redis System.out.println("请求的userId:"+toUserId+"不在该服务器上"); } }catch (Exception e){ e.printStackTrace(); } } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { System.out.println("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); } /** * 实现服务 * 器主动推送 */ public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } public void sendByteBuffer(ByteBuffer data) { System.out.print(data); try { session.getBasicRemote().sendBinary(data); } catch (IOException e) { e.printStackTrace(); } } /** *发送自定 *义消息 **/ public static void sendInfo(String message, String userId) { System.out.println("发送消息到:"+userId+",报文:"+message); if(userId!=null && userId.length()>0 && webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage(message); }else{ System.out.println("用户"+userId+",不在线!"); } } public static void sendFile(String path, String userId) { if(userId!=null && userId.length()>0 && webSocketMap.containsKey(userId)){ try { File file = new File(path); FileInputStream fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int bytesRead; while ((bytesRead = fis.read(buffer)) != -1) { ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead); webSocketMap.get(userId).sendByteBuffer(byteBuffer); } fis.close(); } catch (Exception e) { e.printStackTrace(); } }else{ System.out.println("用户"+userId+",不在线!"); } } /** *发送自定 *义消息 **/ public void sendInfoToAll(String message) { System.out.println("发送报文:"+message); sendMessage(message); } /** * 获得此时的 * 在线人数 * @return */ public static synchronized int getOnlineCount() { return onlineCount; } /** * 在线人 * 数加1 */ public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } /** * 在线人 * 数减1 */ public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; }}
Vue前端代码
websocketOnmessage: function (e){ let data = e.data let that = this; console.log("rec====", data); if (this.isBlob(data)) { console.log("recdata===", data) this.recData.push(data); } else { let record = JSON.parse(data) if (record.msgType == "ipAndMac") { if(!sessionStorage.getItem('ipAndMac')){ let result = record.result; loginLog(result).then(res=>{ console.log('res',res) if(res.data.success){ sessionStorage.setItem('ipAndMac',result) } }) } } else if (record.msgType == "sendFileBegin") { //开始接收服务端发送过来的文件数据 this.recData = []; } else if (record.msgType == "sendFileEnd") { //文件数据的接收完毕,合并数据并上传到业务服务器 if (this.recData.length == 0) return; this.$showMsgBox({ caption:"询问", msg: '检查到待导入文件' + record.fileName + ',是否导入?', callback:(data) => { if (data == "yes") { var formData = new FormData() formData.append('file', new Blob(this.recData)) formData.append('fileName', record.fileName); let url = config.configData.api_url + "/business/biz/importAllByPath"; utils.httpFile(url,formData).then((res) => { if (res.data.success == true) { that.$showImport({tableData:res.data.data}) } else { this.$showToast({msg: res.data.message}); return; } }) } } }) } } },
特别注意这个写法不要搞错,formData.append('file', new Blob(this.recData)),否则后端接受不到正确的格式数据。
服务端接收前端上传数据代码
@RequestMapping(value = "/importAllByPath", method = RequestMethod.POST) public Result<?> importAllByPath(HttpServletRequest request, HttpServletResponse response) { String fileName = request.getParameter("fileName"); MultipartHttpServletRequest multipartRequest = (MultipartHttpServletRequest) request; Map<String, MultipartFile> fileMap = multipartRequest.getFileMap(); for (Map.Entry<String, MultipartFile> entity : fileMap.entrySet()) { MultipartFile file = entity.getValue();// 获取上传文件对象 try { String uuid = StrUtils.getTimeNo("sims_data_"); String path = uploadpath + File.separator + "temp" + File.separator + uuid + File.separator; File dir = new File(path); if (!dir.exists()) { dir.mkdirs(); } String sFile = new File(path).getAbsolutePath() + File.separator + uuid + ".erc"; try (FileOutputStream outputStream = new FileOutputStream(sFile)) { byte[] bytes = file.getBytes(); outputStream.write(bytes); } catch (Exception e) { return Result.error("文件导入失败:" + e.getMessage()); } return Result.ok(result); } catch (Exception e) { return Result.error("文件导入失败:" + e.getMessage()); } } return Result.ok("导入失败"); }