java socket接受大数据 java socket 接收消息

admin2024-06-04  12

前言

本篇文章将涉及以下内容:

  • IO实现Java Socket通信
  • NIO实现Java Socket通信

阅读本文之前最好了解过:

  • Java IO
  • Java NIO
  • Java Concurrency
  • TCP/IP协议

TCP 套接字

TCP套接字是指IP号+端口号来识别一个应用程序,从而实现端到端的通讯。其实一个套接字也可以被多个应用程序使用,但是通常来说承载的是一个应用程序的流量。建立在TCP连接之上最著名的协议为HTTP,我们日常生活中使用的浏览器访问网页通常都是使用HTTP协议来实现的。

先来了解一下通过TCP套接字实现客户端和服务器端的通信。

在TCP客户端发出请求之前,服务器会创建新的套接字(socket),并将套接字绑定到某个端口上去(bind),默认情况下HTTP服务的端口号为80。绑定完成后允许套接字进行连接(listen)并等待连接(accept)。这里的accept方法会挂起当前的进程直到有Socket连接。

在服务器准备就绪后,客户端就可以发起Socket连接。客户端获取服务器的Socket套接字(IP号:端口号),并新建一个本地的套接字。然后连同本地的套接字发送到服务器上。

服务器accept该请求并读取该请求。这里面包括有TCP的三次连接过程。连接建立之后,客户端发送HTTP请求并等待响应。服务端根据HTTP报文返回响应,并关闭连接。

Web Server

当下的Web服务器能够同时支持数千条连接,一个客户端可能向服务器打开一条或多条连接,这些连接的使用状态各不相同,使用率也差异很大。如何有效的利用服务器资源提供低延时的服务成了每个服务器都需要考虑的问题。根据服务器的处理方式,可以分为以下4种服务器,我们也将分别对其进行简单的实现。

  • 单线程服务器
  • 多进程及多线程服务器
  • 复用IO服务器
  • 复用的多线程服务器

单线程服务器

一次只处理一个请求,直到其完成为止。一个事务处理结束后,才会去处理下一条连接。实现简单,但是性能堪忧。

多进程及多线程服务器

可以根据需要创建,或预先创建一下线程/进程。可以为每条连接分配一个线程/进程。但是当强求数量过多时,过多的线程会导致内存和系统资源的浪费。

复用I/O服务器

在复用结构中,会同时监视所有连接上的活动,当连接状态发生变化时,就对那条连接进行少量的处理。处理结束后,就将连接返回到开放连接列表中,等待下一次状态的变化。之后在有事情可做时才会对连接进行处理。在空闲连接上等待的时候不会绑定线程和进程。

复用的多线程服务器

多个线程(对应多个CPU)中的每一个都在观察打开的连接(或是打开连接中的一个子集)。并对每条连接的状态变化时执行任务。

Socket通信基本实现

根据我们上面讲述的Socket通信的步骤,在Java中我们可以按照以下方式逐步建立连接:

首先开启服务器端的SocketServer并且将其绑定到一个端口等待Socket连接:

ServerSocket serverSocket = new ServerSocket(PORT_ID:int);
Socket socket = serverSocket.accept();

当没有Socket连接时,服务器会在accept方法处阻塞。

然后我们在客户端新建一个Socket套接字并且连接服务器:

Socket socket = new Socket(SERVER_SOCKET_IP, SERVER_SOCKET_PORT);
socket.setSoTimeout(100000);

如果连接失败的话,将会抛出异常说明服务器当前不可以使用。
连接成功给的话,客户端就可以获取Socket的输入流和输出流并发送消息。写入Socket的输出流的信息将会先存储在客户端本地的缓存队列中,满足一定条件后会flush到服务器的输入流。服务器获取输入后可以解析输入的数据,并且将响应内容写入服务器的输出流并返回客户端。最后客户端从输入流读取数据。

客户端获取Socket输入输出流,这里将字节流封装为字符流。

//获取Socket的输出流,用来发送数据到服务端
PrintStream out = new PrintStream(socket.getOutputStream());
//获取Socket的输入流,用来接收从服务端发送过来的数据
BufferedReader buf =  new BufferedReader(new InputStreamReader(socket.getInputStream()));

客户端发送数据并等待响应

String str = "hello world";
out.println(str);
String echo = buf.readLine();
System.out.println("收到消息:" + echo);

这里需要注意的是,IO流是阻塞式IO,因此在读取服务端响应的过程中(即buf.reaLine()这一行)会阻塞直到收到服务器响应。

客户端发送结束之后不要忘了关闭IO和Socket通信

out.close();
buf.close();
socket.close();

服务器对消息的处理和客户端类似,后面会贴上完整代码。

Java Socket通信阻塞式通信实现

这里我们对上述的理论进行简单的实现。这里我们实现一个简单的聊天室,只不过其中一方是Server角色而另一个为Client角色。二者都通过System.in流输入数据,并发送给对方。正如我们前面所说,IO流的通信是阻塞式的,因此在等待对方响应的过程中,进程将会挂起,我们这时候输入的数据将要等到下一轮会话中才能被读取。

client端

import java.io.*;
import java.net.Socket;
import java.net.SocketTimeoutException;

public class SocketClient {

    public static void send(String server, int port){
        try {
            Socket socket = new Socket(server, port);
            socket.setSoTimeout(100000);

            System.out.println("正在连接服务器");

            //从控制台读入数据
            BufferedReader input = new BufferedReader(new InputStreamReader(System.in));

            //获取Socket的输出流,用来发送数据到服务端
            PrintStream out = new PrintStream(socket.getOutputStream());
            //获取Socket的输入流,用来接收从服务端发送过来的数据
            BufferedReader buf =  new BufferedReader(new InputStreamReader(socket.getInputStream()));
            boolean running = true;
            while(running){
                System.out.print("输入信息:");
                String str = input.readLine();
                out.println(str);

                if("bye".equals(str)){
                    running = false;
                }else{
                    try{
                        //从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常
                        String echo = buf.readLine();
                        System.out.println("收到消息:" + echo);
                    }catch(SocketTimeoutException e){
                        System.out.println("Time out, No response");
                    }
                }
            }

            input.close();
            socket.close();

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
        }
    }

    public static void main(String[] args){
        send("127.0.0.1", 2048);
    }
}

Server端

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketServer {

    public static void main(String[] args) throws IOException {
        //服务端在2048端口监听客户端请求的TCP连接
        ServerSocket server = new ServerSocket(2048);
        Socket client = null;
        boolean f = true;
        while(f){
            //等待客户端的连接,如果没有获取连接
            client = server.accept();
            System.out.println("与客户端连接成功!");
            //为每个客户端连接开启一个线程
            new Thread(new ServerThread(client)).start();

        }
        server.close();
    }

}

服务器处理数据

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

public class ServerThread implements Runnable{
    private Socket client = null;
    public ServerThread(Socket client){
        this.client = client;
    }

    @Override
    public void run() {
        try{
            //获取Socket的输出流,用来向客户端发送数据
            PrintStream out = new PrintStream(client.getOutputStream());

            //获取Socket的输入流,用来接收从客户端发送过来的数据
            BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));

            BufferedReader serverResponse = new BufferedReader(new InputStreamReader(System.in));
            boolean flag =true;
            while(flag){
                //接收从客户端发送过来的数据
                String str =  buf.readLine();
                System.out.println("收到消息:" + str);
                if(str == null || "".equals(str)){
                    flag = false;
                }else{
                    if("bye".equals(str)){
                        flag = false;
                    }else{
                        //将接收到的字符串前面加上echo,发送到对应的客户端
                        System.out.print("发送回复:");
                        String response  = serverResponse.readLine();
                        out.println(response);
                    }
                }
            }
            out.close();
            client.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

可以和小伙伴试试看,分别启动SocketServerSocketClient并进行通信。不过前提是你们两个需要在一个局域网中。

Java实现单线程服务器

上面的服务器其实只在主线程监听了一个Socket连接,并在30秒之后将其自动关闭了。我们将实现一个经典的单线程服务器。原理和上面相似,这里我们可以直接通过向服务器发送HTTP请求来验证该服务器的运行。

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class SingleThreadServer implements Runnable{

    private ServerSocket serverSocket;

    public SingleThreadServer(ServerSocket serverSocket){
        this.serverSocket = serverSocket;
    }
    @Override
    public void run() {
        Socket socket = null;
        try{
            while (!Thread.interrupted()){
                socket = serverSocket.accept();

                //谷歌浏览器每次会发送两个请求
                //一次用于获取html
                //一次用于获取favicon
                //如果获取favicon成功就缓存,否则会一直请求获得favicon
                //而火狐浏览器第一次也会发出这两个请求
                //在获得favicon失败后就不会继续尝试获取favicon
                //因此使用谷歌浏览器访问该Server的话,你会看到 连接成功 被打印两次
                System.out.println("连接成功");
                process(socket);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void process(Socket socket){
        try {

            InputStreamReader inputStreamReader = null;
            BufferedOutputStream bufferedOutputStream = null;
            try{
                inputStreamReader = new InputStreamReader(socket.getInputStream());
                bufferedOutputStream = new BufferedOutputStream(socket.getOutputStream());

                //这里无法正常读取输入流,因为在没有遇到EOF之前,流会任务socket输入尚未结束,将会继续等待直到socket中断
                //所以这里我们将暂时不读取Socket的输入流中的内容。
                //int size;
                //char[] buffer = new char[1024];
                //StringBuilder stringBuilder = new StringBuilder();
                //while ((size = inputStreamReader.read(buffer)) > 0){
                  //  stringBuilder.append(buffer, 0, size);
                //}


                byte[] responseDocument = "<html><body> Hello World </body></html>".getBytes("UTF-8");
                byte[] responseHeader = ("HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: " + responseDocument.length + "\r\n\r\n").getBytes("UTF-8");

                bufferedOutputStream.write(responseHeader);
                bufferedOutputStream.write(responseDocument);
            }finally {
                bufferedOutputStream.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

该服务器用单一线程处理每次请求,每个线程都将等待服务器处理完上一个请求之后才能获得响应。这里需要注意,纯HTTP请求的输入流的读取会遇到输入流阻塞的问题,因为HTTP请求并没有输入流可识别的EOF标记。从而导致服务器一直挂起在读取输入流的地方。它的解决方法如下:

  • 客户端关闭Socket连接,强制服务器关闭该Socket连接。但是同时也丢失服务器响应
  • 自定义协议,从而服务器可以识别数据的终点。

启动服务器

public static void main(String[] args) throws IOException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        ServerSocket serverSocket = new ServerSocket(2048);
        executorService.execute(new SingleThreadServer(serverSocket));

//        TimeUnit.SECONDS.sleep(10);
//        System.out.println("shut down server");
//        executorService.shutdownNow();
    }

注意要先关闭之前占用2048端口号的服务器。

我们也可以使用代码来测试:

import java.io.*;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestSingleThreadServer {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0 ; i<10 ; i++){
            final int threadId = i;
            executorService.execute(() ->{

                try {
                    Socket socket = new Socket("127.0.0.1", 20006);
                    socket.setSoTimeout(5000);

                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String line = bufferedReader.readLine();

                    System.out.println(threadId + ":" + line);

                    socket.close();;
                } catch (IOException e) {
                    e.printStackTrace();
                }

            });

        }

        TimeUnit.SECONDS.sleep(40);
        executorService.shutdownNow();
    }
}

Java实现多线程服务器

这里我们将为每一个Socket连接提供一个线程来处理。基本实现和上面差不多,只是将每一个Socket连接丢给一个额外的线程来处理。这里可以参考前面的简易聊天室来试着自己实现以下。

Java NIO实现复用服务器

NIO的出现改变了旧式Java读取IO流的方式。首先,它支持非阻塞式读取,其次它可以使用一个线程来管理多个信道。多线程表面上看起来可以同时处理多个Socket通信,但是多线程的管理本身也消耗相当多的资源。其次,很多信道的使用率往往并不高,一些信道往往并不是连通状态中。如果我们可以将资源直接赋予当前活跃的Socket通信的话,可以明显的提高资源利用率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!