欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

基于akka怎樣實(shí)現(xiàn)RPC

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)基于akka怎樣實(shí)現(xiàn)RPC,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

10年積累的成都網(wǎng)站制作、成都做網(wǎng)站、外貿(mào)營銷網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識你,你也不認(rèn)識我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有靈壽免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

目前的工作在基于akka實(shí)現(xiàn)數(shù)據(jù)服務(wù)總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡單的寫出一個大型的分布式集群的架構(gòu)。里面的一塊功能就是RPC(遠(yuǎn)程過程調(diào)用),這篇文章將會介紹一種實(shí)現(xiàn)方式。
akka rpc java
目錄[-]
akka-rpc(基于akka的rpc的實(shí)現(xiàn))
RPC
實(shí)現(xiàn)原理
Server端核心代碼
Client端核心代碼 
Demo
akka-rpc(基于akka的rpc的實(shí)現(xiàn))

代碼:http://git.oschina.net/for-1988/Simples

目前的工作在基于akka(java)實(shí)現(xiàn)數(shù)據(jù)服務(wù)總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡單的寫出一個大型的分布式集群的架構(gòu)。里面的一塊功能就是RPC(遠(yuǎn)程過程調(diào)用)。

RPC

遠(yuǎn)程過程調(diào)用(Remote Procedure Call,RPC)是一個計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺計(jì)算機(jī)的程序調(diào)用另一臺計(jì)算機(jī)的子程序,而程序員無需額外地為這個交互作用編程。如果涉及的軟件采用面向?qū)ο缶幊?,那么遠(yuǎn)程過程調(diào)用亦可稱作遠(yuǎn)程調(diào)用或遠(yuǎn)程方法調(diào)用,例:Java RMI。

實(shí)現(xiàn)原理

整個RPC的調(diào)用過程完全基于akka來傳遞對象,因?yàn)樾枰M(jìn)行網(wǎng)絡(luò)通信,所以我們的接口實(shí)現(xiàn)類、調(diào)用參數(shù)以及返回值都需要實(shí)現(xiàn)java序列化接口??蛻舳烁?wù)端其實(shí)都是在一個Akka 集群關(guān)系中,Client跟Server都是集群中的一個節(jié)點(diǎn)。首先Client需要初始化RpcClient對象,在初始化的過程中,我們啟動了AkkaSystem,加入到整個集群中,并創(chuàng)建了負(fù)責(zé)與Server進(jìn)行通信的Actor。然后通過RpcClient中的getBean(Class<T> clz)方法獲取Server端的接口實(shí)現(xiàn)類的實(shí)例對象,然后通過動態(tài)代理攔截這個對象的所有方法。最后,在執(zhí)行方法的時(shí)候,在RpcBeanProxy中向Server發(fā)送CallMethod事件,執(zhí)行遠(yuǎn)程實(shí)現(xiàn)類的方法,獲取返回值給Client。

Server端核心代碼

public class RpcServer extends UntypedActor {
         private Map<String, Object> proxyBeans;

    public RpcServer(Map<Class<?>, Object> beans) {
        proxyBeans = new HashMap<String, Object>();
        for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
                .hasNext();) {
            Class<?> inface = iterator.next();
            proxyBeans.put(inface.getName(), beans.get(inface));
        }
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口實(shí)現(xiàn)類的實(shí)例
            CallBean event = (CallBean) message;
            ReturnBean bean = new ReturnBean(
                    proxyBeans.get(event.getBeanName()), getSelf());
            getSender().tell(bean, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {
            CallMethod event = (CallMethod) message;
            Object bean = proxyBeans.get(event.getBeanName());
            Object[] params = event.getParams();
            List<Class<?>> paraTypes = new ArrayList<Class<?>>();
            Class<?>[] paramerTypes = new Class<?>[] {};
            if (params != null) {
                for (Object param : params) {
                    paraTypes.add(param.getClass());
                }
            }
            Method method = bean.getClass().getMethod(event.getMethodName(),
                    paraTypes.toArray(paramerTypes));
            Object o = method.invoke(bean, params);
            getSender().tell(o, getSelf());
        }
    }

}
啟動Server

public static void main(String[] args) {
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2551)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcServer]"))
                .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("EsbSystem", config);
        
        // Server 加入發(fā)布的服務(wù)
        Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
        beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
        system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
    }
Client端核心代碼 

RpcClient類型集成了Thread,為了解決一個問題:因?yàn)锳kkaSystem在加入集群中的時(shí)候是異步的,所以我們在第一次new RpcClient對象的時(shí)候需要等待加入集群成功以后,才可以執(zhí)行下面的方法,不然獲取的 /user/rpcServer Route中沒有Server的Actor,請求會失敗。

public class RpcClient extends Thread {

    private ActorSystem system;

    private ActorRef rpc;

    private ActorRef clientServer;

    private static RpcClient instance = null;

    public RpcClient() {
        this.start();
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2552)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcClient]"))
                .withFallback(ConfigFactory.load());
        system = ActorSystem.create("EsbSystem", config);

        int totalInstances = 100;
        Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
        boolean allowLocalRoutees = false;
        ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
                new AdaptiveLoadBalancingGroup(
                        HeapMetricsSelector.getInstance(),
                        Collections.<String> emptyList()),
                new ClusterRouterGroupSettings(totalInstances, routeesPaths,
                        allowLocalRoutees, "RpcServer"));
        rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
        clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
                "client");
        Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回調(diào)事件,恢復(fù)當(dāng)前線程的中斷
            @Override
            public void run() {
                synchronized (instance) {
                    System.out.println("notify");
                    instance.notify();
                }
            }
        });

    }

    public static RpcClient getInstance() {
        if (instance == null) {
            instance = new RpcClient();
            synchronized (instance) {
                try {   //中斷當(dāng)前線程,等待加入集群成功后,恢復(fù)
                    System.out.println("wait");
                    instance.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return instance;
    }

    public <T> T getBean(Class<T> clz) {
        Future<Object> future = Patterns.ask(clientServer,
                new RpcEvent.CallBean(clz.getName(), clientServer),
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        try {
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            if (o != null) {
                ReturnBean returnBean = (ReturnBean) o;
                return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
                        clientServer, clz);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
RpcClientServer

public class RpcClientServer extends UntypedActor {

    private ActorRef rpc;

    public RpcClientServer(ActorRef rpc) {
        this.rpc = rpc;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {  //向Server發(fā)送CallBean請求
            CallBean event = (CallBean) message;
            Future<Object> future = Patterns.ask(rpc, event, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {  //向Server發(fā)送方法調(diào)用請求
            Future<Object> future = Patterns.ask(rpc, message, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        }
    }
}
RpcBeanProxy,客戶端的動態(tài)代理類

public class RpcBeanProxy implements InvocationHandler {

    private ActorRef rpcClientServer;

    private Class<?> clz;

    public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
        this.rpcClientServer = rpcClientServer;
        this.clz = clz;
        return Proxy.newProxyInstance(target.getClass().getClassLoader(),
                target.getClass().getInterfaces(), this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        Object result = null;
        RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
                method.getName(), args, clz.getName());
        Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
        result = o;
        return result;
    }

}
Demo

Interface,Client和Server都需要這個類,必須實(shí)現(xiàn)序列化

public interface ExampleInterface extends Serializable{
    public String sayHello(String name);
}
實(shí)現(xiàn)類,只需要Server端存在這個類。

public class ExampleInterfaceImpl implements ExampleInterface {
    @Override
    public String sayHello(String name) {
        System.out.println("Be Called !");
        return "Hello " + name;
    }
}
Client調(diào)用

public static void main(String[] args) {
        RpcClient client = RpcClient.getInstance();
        long start = System.currentTimeMillis();
        
        ExampleInterface example = client.getBean(ExampleInterface.class);
        System.out.println(example.sayHello("rpc"));
        
        long time = System.currentTimeMillis() - start;
        System.out.println("time :" + time);
    }
 


這里第一次調(diào)用耗時(shí)比較長需要46毫秒,akka會對消息進(jìn)行優(yōu)化,調(diào)用多次以后時(shí)間為 1~2毫秒。

上述就是小編為大家分享的基于akka怎樣實(shí)現(xiàn)RPC了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

新聞名稱:基于akka怎樣實(shí)現(xiàn)RPC
本文鏈接:http://aaarwkj.com/article44/ipdiee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)、網(wǎng)站收錄企業(yè)建站、靜態(tài)網(wǎng)站、微信小程序電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)
亚洲av不卡一区二区在线观看 | 亚洲不卡高清一区二区三区| 国产精品福利手机在线观看| 欧美日韩三级国产在线| 正在播放日韩黄色精品| 色哟哟网站之中文字幕| 给我搜亚洲免费播放黄色大片| 中文字幕熟妇人妻av在线| 亚洲熟妇一区二区在线| 微拍福利一区二区三区| 国产高清自拍视频在线一区| 国产综合一区在线观看97| 亚洲老司机深夜福利| 日韩一级黄色片在线播放| 成人av在线免费播放| 熟女俱乐部五十路六十路| 中文字幕国产精品欧美| 久久热精品视频这里有| 91手机国产三级在线| 日韩精品中文字幕欧美乱| 精品综合亚洲中文字幕| 三欲一区二区三区中文字幕| 99亚洲综合一区二区三区| 少妇被又粗又硬猛烈进视频| 欧美日韩精品一区二区视频永久免 | 国产黄色一区二区三区四区| 亚洲一区二区三区熟女av| 国产91高清在线观看| 日本视频三区在线播放| 最近更新中文字幕不卡在线| 久久三级中文欧大战字幕| 欧美日韩亚洲精品亚洲欧洲| 伊人激情久久综合中文字幕| 欧美日韩亚洲中文字幕| 一区二区三区乱码国产在线| 亚洲av少妇一区二区成年男人| 四虎最新地址在线观看| 男人一插就想射的原因| 国产精品久久中文字幕亚洲| 亚洲精品一区二区日本| 女人裸体网站无遮挡午夜|