博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程之:并行程序设计模式
阅读量:6215 次
发布时间:2019-06-21

本文共 12192 字,大约阅读时间需要 40 分钟。

hot3.png

1.Future模式

实现方式:

首先实现Data接口

public interface Data {    String getResult();}

实现真正执行业务的实现类

public class RealData implements Data{    private final String result;    public RealData(String param){        StringBuffer sb = new StringBuffer();        for (int i=0;i<10;i++){            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            sb.append(param).append(i);        }        result = sb.toString();    }    @Override    public String getResult() {        return result;    }}

实现真实业务类的代理类

public class FutureData implements Data{    protected RealData realData;    protected boolean isReady = false;    public synchronized void setRealData(RealData realData){        if(isReady){            return;        }        isReady = true;        this.realData = realData;        notifyAll();    }    public synchronized String getResult() {        while (!isReady){            try {                wait();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        return realData.getResult();    }}

通过client开启线程,获取futureData

public class Client {    public Data request(final String name){        final FutureData futureData = new FutureData();        new Thread(){            @Override            public void run() {                futureData.setRealData(new RealData(name));            }        }.start();        return futureData;    }}

测试

public static void main(String[] str){        Client client = new Client();        System.out.println("请求开始");        Data data = client.request("jok");        try {            System.out.println("做其他事情");            Thread.sleep(6000);            System.out.println("其他事情完成");        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println(data.getResult());    }

JDK实现方式

首先实现业务类

public class RealData implements Callable
{ private String name; public RealData(String name) { this.name = name; } @Override public String call() throws Exception { //只是业务实现 StringBuffer stringBuffer = new StringBuffer(); for (int i = 0; i<10;i++){ stringBuffer.append(name).append(i); Thread.sleep(1000); } return stringBuffer.toString(); }}

执行方法

public static void main(String[] args) throws InterruptedException, ExecutionException {    FutureTask
future = new FutureTask
(new RealData("jok")); ExecutorService service = Executors.newCachedThreadPool(); //开始执行 System.out.println("开始执行"); service.execute(future); //做其他事情 System.out.println("执行其他事情"); Thread.sleep(6000); System.out.println("执行其他事情完成"); System.out.println(future.get());}

2.Master-Worker模式

master-worker模式的核心思想是,系统由两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。

212913_TEg0_3100849.png

由5个worker线程执行100个任务的实现例子

worker工人类

public abstract class Worker implements Runnable{    //任务子队列,用于取得子任务    private Queue  workQueue;    //子任务结果集    private Map
resultMap; public void setWorkQueue(Queue
workQueue) { this.workQueue = workQueue; } public void setResultMap(Map
resultMap) { this.resultMap = resultMap; } /** * 子任务处理逻辑,在子类中实现具体逻辑 * @param input * @return */ public abstract Object handle(Object input); @Override public void run() { while (true){ //从任务列表获取子任务 Object input = workQueue.poll(); if(input == null) break; //处理子任务 Object result = handle(input); //将结果存入到map中 resultMap.put(Integer.toString(input.hashCode()),result); } }}

Master类实现

public class Master {    //任务队列    protected Queue workerQueue = new ConcurrentLinkedDeque();    //Worker进程队列    protected Map
threadMap = new HashMap
(); //子任务处理结果集 protected Map
resultMap = new ConcurrentHashMap
(); /** * 判断任务是否已经全部结束 * @return */ public boolean isComplete(){ for (Map.Entry
entry : threadMap.entrySet()) { if(entry.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true; } public Master(Worker worker,int workerCount){ worker.setWorkQueue(workerQueue); worker.setResultMap(resultMap); for (int i=0;i
getResultMap(){ return resultMap; } /** * 开始执行所以子任务 */ public void execute(){ for (Thread thread : threadMap.values()){ thread.start(); } }}

实现业务逻辑的worker类

/** * 计算传入integer数据立方 * @authorAdministrator * @date2017/3/19 */public class PlusWorker extends Worker {    @Override    public Object handle(Object input) {        Integer in = (Integer)input;        return in*in*in;    }}

执行任务和结果处理代码

public static void main(String[] args){    //5个工作线程    Master master = new Master(new PlusWorker(),5);    //提交100个任务    for (int i=0;i<100;i++)        master.submit(i);    //开始执行,由5个线程执行100个任务    master.execute();    String key ;    int result = 0;    while (master.resultMap.size()>0 || !master.isComplete()){//当map中还存在结果,或者任务没有执行完时进入循环取结果        for (Map.Entry
entry :master.resultMap.entrySet()) { key = entry.getKey(); result += (Integer) entry.getValue(); master.resultMap.remove(key);//从resultMap中获取结果,取出一个就从map中移除一个 } } //执行结束 System.out.println(result);}

3.Guarded Suspension模式

该模式为保护暂停,其核心思想就是当有多个请求过来时,会先将请求放入一个列表中,然后有单独的线程从列表中取出请求进行处理。这样的模式可以避免当请求并发过大的时候不会给服务器造成过大压力。

下面是有返回值的请求实例:

定义response接口

public interface Response {    String getResult();}

response实际实现类

public class RealResponse implements Response {    private String result;    public void setResult(String result){        this.result = result;    }    @Override    public String getResult() {        return result;    }}

代理实现类

public class FutureResponse implements Response {    private RealResponse realResponse;    private boolean isReady = false;    public synchronized void setRealResponse(RealResponse realResponse){        if(isReady){            return;        }        this.realResponse = realResponse;        isReady = true;        notifyAll();    }    @Override    public synchronized String getResult() {        if(!isReady){            try {                wait();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        return realResponse.getResult();    }}

request请求对象

public class Request {    private String name;    private FutureResponse futureResponse;    public void setFutureResponse(FutureResponse futureResponse) {        this.futureResponse = futureResponse;    }    public void setName(String name) {        this.name = name;    }    public FutureResponse getFutureResponse() {        return futureResponse;    }    public String getName() {        return name;    }}

存放request对象的列表工具

public class RequestQueue {    private Queue
requestQueue = new ConcurrentLinkedQueue
(); public synchronized Request getRequest(){ if(requestQueue.size()==0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return requestQueue.poll(); } public synchronized void setRequest(Request request){ requestQueue.add(request); notifyAll(); }}

客户端程序

public class ClientThread extends Thread {    private RequestQueue requestQueue;    private List
myRequest = new ArrayList
(); public ClientThread(RequestQueue requestQueue){ this.requestQueue = requestQueue; } @Override public void run() { for (int i=0;i<10;i++){ Request request = new Request(); FutureResponse futureResponse = new FutureResponse(); //构造请求request request.setName("jok"+i); request.setFutureResponse(futureResponse); requestQueue.setRequest(request); myRequest.add(request); } System.out.println("请求执行构造发送完成"); } public void showResult(){ for (Request request:myRequest) { System.out.println(request.getName()+":执行结果="+request.getFutureResponse().getResult()); } }}

服务端程序

public class ServerThread extends Thread {    private RequestQueue requestQueue = new RequestQueue();    public RequestQueue getRequestQueue() {        return requestQueue;    }    @Override    public void run() {        while (true){            final Request request = requestQueue.getRequest();            String result = "";            try {                //处理业务                result = request.getName()+"---do";                System.out.println("请求处理开始"+request.getName());                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            RealResponse realResponse = new RealResponse();            realResponse.setResult(result);            request.getFutureResponse().setRealResponse(realResponse);            System.out.println("处理请求"+request.getName()+"结束"+result);        }    }}

测试方法

public class Test {    public static void main(String[] args){        //服务器启动        ServerThread serverThread = new ServerThread();        serverThread.start();        System.out.println("服务器启动完毕");        //客户端开始        System.out.println("客户端开始执行");        ClientThread clientThread = new ClientThread(serverThread.getRequestQueue());        clientThread.start();        System.out.println("客户端发送完毕");        clientThread.showResult();    }}

4.生产者-消费者模式

232732_9O0E_3100849.png

首先定义任务数据

public final class PCData {    private final int data;    public PCData(int data){        this.data = data;    }    public int getData() {        return data;    }    @Override    public String toString() {        return "data="+data;    }}

生产者代码

public class Producer implements Runnable {    private volatile boolean isRunning = true;    private BlockingQueue
blockingQueue;//共享空间 private static AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue
blockingQueue){ this.blockingQueue = blockingQueue; } @Override public void run() { PCData data = null; try { while (isRunning){ Thread.sleep(2000); data = new PCData(count.incrementAndGet()); System.out.println("生产者线程"+Thread.currentThread().getId()+"生产数据:"+count); blockingQueue.offer(data,2, TimeUnit.SECONDS);//添加数据到阻塞队列 } System.out.println("生产者线程停止"+Thread.currentThread().getId()); }catch (Exception e){ e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop(){ isRunning = false; }}

消费者代码

public class Consumer implements Runnable {    private BlockingQueue
blockingQueue; public Consumer(BlockingQueue
blockingQueue){ this.blockingQueue = blockingQueue; } @Override public void run() { try { PCData data = null; while (true){ data = blockingQueue.take();//如果队列里面没有会wait等待数据放入 if(data != null){ int n = data.getData()*data.getData(); System.out.println("消费者线程"+Thread.currentThread().getId()+"消费数据"+ MessageFormat.format("{0}*{1}={2}",data.getData(),data.getData(),n)); Thread.sleep(4000); } } }catch (Exception e){ e.printStackTrace(); Thread.currentThread().interrupt(); } }}

测试程序代码

//定义阻塞队列作为共享数据空间BlockingQueue
blockingQueue = new LinkedBlockingQueue
();//生产者Producer p = new Producer(blockingQueue);Producer p2 = new Producer(blockingQueue);//消费者Consumer c = new Consumer(blockingQueue);Consumer c2 = new Consumer(blockingQueue);//线程池执行ExecutorService service = Executors.newCachedThreadPool();service.execute(p);service.execute(p2);service.execute(c);service.execute(c2);Thread.sleep(6000);p.stop();p2.stop();Thread.sleep(2000);service.shutdown();

转载于:https://my.oschina.net/u/3100849/blog/862641

你可能感兴趣的文章
我的友情链接
查看>>
安全测试 -web
查看>>
Linux安全攻略 SSH服务连接时常见问题解答
查看>>
python paramiko 使用
查看>>
Protobuf +ZeroRpc 高性能分布式RPC 数据传输
查看>>
Mac Apache PHP 基础环境配置
查看>>
在svg里面画虚线
查看>>
Hadoop专业解决方案之构建Hadoop企业级应用
查看>>
ACE 关于网络模块
查看>>
Linux下加载iso光盘镜像的方法
查看>>
差分法详解
查看>>
基于ITIL的运维系统(1)——开发背景和理念
查看>>
多线程 java文件复制,文件提取,删除,zip压缩工具
查看>>
mantis + Linux 安装步骤(笔记)
查看>>
阿里云CentOS搭建系统
查看>>
我的友情链接
查看>>
discuz url静态化 apache,nginx
查看>>
测试 from Windows Live Writer
查看>>
iOS开发7:自动旋转与调整大小
查看>>
简单聊聊SOA和微服务
查看>>