线程间通信的方式 java(Java多线程通信)

1 基本概括

线程间通信的方式 java(Java多线程通信)

2 主要介绍

2.1 进程通信和线程通信的概念

进程通信

进程相互交换数据与信息。进程间通信有两种基本模型:共享内存和消息传递(消息队列)。

线程通信

原因:为了更好地协作,线程无论是交替式执行,还是接力式执行,都需要进行通信告知。

线程间通信:

(1)临界区

通过多线程的串行化允许线程对共享资源的访问,速度快

(2)互斥量

只有拥有互斥对象的线程才能对资源空间进行访问,因为互斥对象只有一个,所以可以保证公共资源不被多个线程访问

(3)信号量

用于控制多个线程对共享空间资源的访问,一般会限制同一时刻访问资源的最大线程数

(4)信号

通过通知操作的方式来控制线程间的同步,可以区分线程间的优先级

2.2 线程通信

2.2.1线程通信方式

共享内存:

共享内存这种方式比较常见,我们经常会设置一个共享变量。然后多个线程去操作同一个共享变量。从而达到线程通讯的目的。例如,我们使用多个线程去执行页面抓取任务,我们可以使用一个共享变量count来记录任务完成的数量。每当一个线程完成抓取任务,会在原来的count上执行加1操作。这样每个线程都可以通过获取这个count变量来获得当前任务的完成情况。当然必须要考虑的是共享变量的同步问题,这也共享内存容易出错的原因所在。

线程间通信的方式 java(Java多线程通信)

这种通讯模型中,不同的线程之间是没有直接联系的。都是通过共享变量这个“中间人”来进行交互。而这个“中间人”必要情况下还需被保护在临界区内(加锁或同步)。由此可见,一旦共享变量变得多起来,并且涉及到多种不同线程对象的交互,这种管理会变得非常复杂,极容易出现死锁等问题。

消息传递:

消息传递方式采取的是线程之间的直接通信,不同的线程之间通过显式的发送消息来达到交互目的。消息传递最有名的方式应该是actor模型了。在这种模型下,一切都是actor,所有的actor之间的通信都必须通过传递消息才能达到。每个actor都有一个收件箱(消息队列)用来保存收到其他actor传递来的消息。actor自己也可以给自己发送消息。这才是面向对象的精髓啊!

线程间通信的方式 java(Java多线程通信)

这种模型看起来比共享内存模型要复杂。但是一旦碰到复杂业务的话,actor模型的优势就体现出来了。

首先我们定义一个统计actor用来统计任务完成量。然后把多个网址(消息方式)发给多个抓取actor,抓取actor处理完任务后发送消息通知统计actor任务完成,统计actor对自己保存的变量count(这个只有统计actor才能看到)加一。

2.2.2 线程通信的具体方式

1.volatile关键字方式

volatile有两大特性,一是可见性,二是有序性,禁止指令重排序,其中可见性就是可以让线程之间进行通信。

volatile语义保证线程可见性有两个原则保证

所有volatile修饰的变量一旦被某个线程更改,必须立即刷新到主内存

所有volatile修饰的变量在使用之前必须重新读取主内存的值

volatile保证可见性原理图

线程间通信的方式 java(Java多线程通信)

如果将volatile关键字去掉,线程切换一定次数后将不能感知到flag的变化,最开始能感知是线程启动时间差的原因。

2.等待/通知机制

线程间通信的方式 java(Java多线程通信)

等待通知机制是基于wait和notify方法来实现的,在一个线程内调用该线程锁对象的wait方法,线程将进入等待队列进行等待直到被通知或者被唤醒。

为什么要必须获取锁?

因为调用wait方法时,必须要先释放锁,如果没有持有锁将会抛出异常。

wait()方法和notify()方法和notifyAll()方法

1 .wait()方法 语义:使得当前线程立刻停止运行,处于等待状态(WAIT),并将当前线程置入锁对象的等待队列中,直到被通知(notify)或被中断为止。 使用条件:wait方法只能在同步方法或同步代码块中使用...

2.notify()方法 语义:唤醒处于等待状态的线程 使用条件:notify()也必须在同步方法或同步代码块中调用,用来唤醒等待该...

3.notifyAll()方法 唤醒所有处于等待状态的线程

3.Join 方法

线程间通信的方式 java(Java多线程通信)

方法join()的作用是等待线程销毁,方法join具有使线程排队的作用,有些类似同步的运行效果,方法join的作用是使所属的线程对象x正常执行run方法的任务,而使当前线程z进行无限期的阻塞,等待线程

x执行完毕后再执行线程后面的代码。join与sychronized关键字的区别:jion在内部使用wait方法进行等待,而sychronized使用的是“对象监视器原理”作为同步。

join方法的源码

  public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
 
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
 
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
  }

 

4.管道

管道流是JAVA中线程通讯的常用方式之一,基本流程如下:

1)创建管道输出流PipedOutputStream pos和管道输入流PipedInputStream pis

2)将pos和pis匹配,pos.connect(pis);

3)将pos赋给信息输入线程,pis赋给信息获取线程,就可以实现线程间的通讯了

5.threadLocal方式

threadLocal方式的线程通信,不像以上四种方式是多个线程之间的通信,它更像是一个线程内部的通信,将当前线程和一个map绑定,在当前线程内可以任意存取数据,减省了方法调用间参数的传递。

2.3 进程通信

2.3.1进程通信分类

低级通信

由于进程的互斥和同步,需要在进程间交换一定的信息,故不少学者将它们也归为进程通信。只能传递状态和整数值(控制信息)。

特点:传送信息量小,效率低,每次通信传递的信息量固定,若传递较多信息则需要进行多次通信。

编程复杂:用户直接实现通信的细节,容易出现。

高级通信

提高信号通信的效率,传递大量数据,减轻程序编制的复杂度。

提供三种方式:

1.共享内存模式

2.消息传递模式

3.共享文件模式

2.3.2 进程通信方式

管道

(1)管道(Pipe):管道可用于具有亲缘关系进程间的通信,允许一个进程和另一个与它有共同祖先的进程之间进行通信。

(2)命名管道(named pipe):命名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。命名管道在文件系统中有对应的文件名。命名管道通过命令mkfifo或系统调用mkfifo来创建。

系统IPC

(3)信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送 信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数)。

(4)消息(Message)队列:消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺

(5)共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。

(6)内存映射(mapped memory):内存映射允许任何多个进程间通信,每一个使用该机制的进程通过把一个共享的文件映射到自己的进程地址空间来实现它。

(7)信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。

套接口

(8)套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。

2.3.3 进程通信方式的优缺点

效率对比:

线程间通信的方式 java(Java多线程通信)

管道

a、较早的一种通信方式,缺点明显:只能用于有亲缘关系进程之间的通信;只支持单向数据流,如果要双向通信需要多创建一个管道来实现。

b、自身具备同步机制。

c、随进程持续。

信号

a、这种通信可携带的信息极少。不适合需要经常携带数据的通信。

b、不具备同步机制,类似于中断,什么时候产生信号,进程是不知道的。

消息队列

a、与共享内存和FIFO类似,使用一个路径名来实现各个无亲缘关系进程之间的通信。消息队列相比于其他方式有很多优点:它提供有格式的字节流,减少了开发人员的工作量;消息具有类型(system V)或优先级(posix)。其他方式都没有这些优点。

b、具备同步机制。

c、随内核持续。

共享内存

a、最快的一种通信方式,多个进程可同时访问同一片内存空间,相对其他方式来说具有更少的数据拷贝,效率较高。

b、需要结合信号灯或其他方式来实现多个进程间同步,自身不具备同步机制。

c、随内核持续,相比于随进程持续生命力更强。

FIFO

a、是有名管道,所以支持没有亲缘关系的进程通信。和共享内存类似,提供一个路径名字将各个无亲缘关系的进程关联起来。但是也需要创建两个描述符来实现双向通信。

b、自身具备同步机制。

c、随进程持续。

socket

a、使用socket通信的方式实现起来简单,可以使用因特网域和UNIX域来实现,使用因特网域可以实现不同主机之间的进出通信。

b、该方式自身携带同步机制,不需要额外的方式来辅助实现同步。

c、随进程持续。

3 实现方式

3.1 volatile实现多线程通信

public class VolatileDemo {
    private static volatile boolean flag = true;
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    if (flag){
                        System.out.println("trun on");
                        flag = false;
                    }
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    if (!flag){
                        System.out.println("trun off");
                        flag = true;
                    }
                }
            }
        }).start();
    }
}

 

3.2 等待/通知机制(wait/notify机制)

public class WaitDemo {
    private static Object lock = new Object();
    private static  boolean flag = true;
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    while (flag){
                        try {
                            System.out.println("wait start .......");
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println("wait end ....... ");
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                if (flag){
                    synchronized (lock){
                        if (flag){
                            lock.notify();
                            System.out.println("notify .......");
                            flag = false;
                        }

                    }
                }
            }
        }).start();
    }
}

 

3.3 join方法

/**
* Join方法就是挂起调用线程,直到被调用线程执行完毕后再继续执行。例:threadB线程中threadA的join方法,
* 所以threadB需在threadA执行完毕后才继续执行join后的代码,而主线程执行threadB.join(),所以最终主线程需等threadA和threadB执行完毕后才继续。
*
*/
public class JoinThread {

    public static void join() throws InterruptedException {
        long startTime = System.currentTimeMillis();
        Thread threadA = new Thread(() -> {
            try {
                log.info("threadA start");
                Thread.sleep(4000);
                log.info("threadA end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread threadB = new Thread(() -> {
            try {
                threadA.join();
                log.info("threadB start");
                Thread.sleep(3000);
                log.info("threadB end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadA.start();
        threadB.start();


        threadB.join();
        log.info("run time [{}]", startTime - System.currentTimeMillis());
        log.info("main thread end");
    }
}

 

3.4 管道


public class testPipeConnection {

  public static void main(String[] args) {
    /**
     * 创建管道输出流
     */
    PipedOutputStream pos = new PipedOutputStream();
    /**
     * 创建管道输入流
     */
    PipedInputStream pis = new PipedInputStream();
    try {
      /**
       * 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现
       */
      pos.connect(pis);
    } catch (IOException e) {
      e.printStackTrace();
    }
    /**
     * 创建生产者线程
     */
    Producer p = new Producer(pos);
    /**
     * 创建消费者线程
     */
    Consumer1 c1 = new Consumer1(pis);
    /**
     * 启动线程
     */
    p.start();
    c1.start();
  }
}

/**
 * 生产者线程(与一个管道输入流相关联)
 * 
 */
class Producer extends Thread {
  private PipedOutputStream pos;

  public Producer(PipedOutputStream pos) {
    this.pos = pos;
  }

  public void run() {
    int i = 0;
    try {
      while(true)
      {
      this.sleep(3000);
      pos.write(i);
      i++;
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

/**
 * 消费者线程(与一个管道输入流相关联)
 * 
 */
class Consumer1 extends Thread {
  private PipedInputStream pis;

  public Consumer1(PipedInputStream pis) {
    this.pis = pis;
  }

  public void run() {
    try {
      while(true)
      {
      System.out.println("consumer1:"+pis.read());
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

 

4 常见问题

1 进程间通信的方式有哪些?线程间通讯方式有哪些?
2 join方法的原理?
3 notify和notifyall有什么区别?
4 为什么wait方法要写在while循环里面而不是if呢?
5 在 Java 的并发编程中,什么是等待-通知机制?它是怎么实现的?
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发表评论

登录后才能评论