/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Random;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import junit.textui.TestRunner;
import org.jgroups.Address;
import org.jgroups.BlockEvent;
import org.jgroups.Channel;
import org.jgroups.ExtendedReceiver;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.StreamingGetStateEvent;
import org.jgroups.StreamingSetStateEvent;
import org.jgroups.TimeoutException;
import org.jgroups.UnblockEvent;
import org.jgroups.View;
import org.jgroups.ViewId;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Util;

public class StreamingStateTransferTest
extends TestCase {
    private static final String CHANNEL_PROPS = "streaming-state-transfer.xml";
    private static final int INITIAL_NUMBER_OF_MEMBERS = 5;
    private int runningTime = 50000;
    private Random r = new Random();
    private boolean usePullMode = false;
    private boolean useDisp = false;
    private int size = 100;
    private static final int MEGABYTE = 0x100000;

    public StreamingStateTransferTest(String arg0) {
        super(arg0);
    }

    public void testTransfer() throws Exception {
        long start = System.currentTimeMillis();
        boolean running = true;
        ArrayList<GroupMember> members = new ArrayList<GroupMember>();
        for (int i = 0; i < 5; ++i) {
            GroupMember member = new GroupMember(this.usePullMode, this.useDisp, this.size);
            members.add(member);
            Thread t = new Thread(member);
            t.start();
            Util.sleep(this.getRandomDelayInSeconds(10, 12) * 1000);
        }
        while (running) {
            if (this.r.nextBoolean()) {
                Util.sleep(this.getRandomDelayInSeconds(10, 12) * 1000);
                GroupMember member = new GroupMember(this.usePullMode, this.useDisp, this.size);
                members.add(member);
                Thread t = new Thread(member);
                t.start();
            } else if (members.size() > 1) {
                Util.sleep(this.getRandomDelayInSeconds(3, 8) * 1000);
                GroupMember unluckyBastard = (GroupMember)members.get(this.r.nextInt(members.size()));
                if (!unluckyBastard.isCoordinator()) {
                    members.remove(unluckyBastard);
                    unluckyBastard.stopRunning();
                } else {
                    System.out.println("Not killing coordinator ");
                }
            }
            running = System.currentTimeMillis() - start <= (long)this.runningTime;
            System.out.println("Running time " + (System.currentTimeMillis() - start) / 1000L + " secs");
        }
        System.out.println("Done");
    }

    protected int getRandomDelayInSeconds(int from, int to) {
        return from + this.r.nextInt(to - from);
    }

    protected void setUp() throws Exception {
        String prop = System.getProperty("disp");
        if (prop != null) {
            this.useDisp = prop.equalsIgnoreCase("true");
            System.out.println("Using parameter disp=" + this.useDisp);
        }
        if ((prop = System.getProperty("pull")) != null) {
            this.usePullMode = prop.equalsIgnoreCase("true");
            System.out.println("Using parameter usePullMode=" + this.usePullMode);
        }
        if ((prop = System.getProperty("size")) != null) {
            this.size = Integer.parseInt(System.getProperty("size"));
            System.out.println("Using parameter size=" + this.size);
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
    }

    public static Test suite() {
        return new TestSuite(StreamingStateTransferTest.class);
    }

    public static void main(String[] args) {
        String[] testCaseName = new String[]{StreamingStateTransferTest.class.getName()};
        TestRunner.main((String[])testCaseName);
    }

    private static class GroupMember
    implements Runnable,
    ExtendedReceiver {
        JChannel ch = null;
        View currentView;
        volatile boolean running = true;
        private int stateSize;
        private int bufferSize = 8192;
        private boolean usePullMode;
        private Random ran = new Random();
        private boolean useDispacher;

        public GroupMember(boolean pullMode, boolean dispMode, int size) {
            this.setStateSize(size * 0x100000);
            this.setUsePullMode(pullMode);
            this.setUseDispatcher(dispMode);
            try {
                this.ch = new JChannel(StreamingStateTransferTest.CHANNEL_PROPS);
                this.ch.setOpt(5, Boolean.TRUE);
                this.ch.setOpt(6, Boolean.TRUE);
                this.ch.setOpt(0, Boolean.TRUE);
                if (this.useDispacher) {
                    RpcDispatcher disp = new RpcDispatcher((Channel)this.ch, (MessageListener)this, (MembershipListener)this, (Object)this);
                } else if (!this.usePullMode) {
                    this.ch.setReceiver(this);
                }
                this.ch.connect("transfer");
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public final void setUsePullMode(boolean usePullMode) {
            this.usePullMode = usePullMode;
        }

        public final void setUseDispatcher(boolean useDispacher) {
            this.useDispacher = useDispacher;
        }

        public String getAddress() {
            if (this.ch != null && this.ch.isConnected()) {
                return this.ch.getLocalAddress().toString();
            }
            return null;
        }

        public void stopRunning() {
            this.running = false;
            System.out.println("Disconnect " + this.getAddress());
            if (this.ch != null) {
                this.ch.close();
            }
        }

        protected boolean isCoordinator() {
            if (this.ch == null) {
                return false;
            }
            Address local_addr = this.ch.getLocalAddress();
            if (local_addr == null) {
                return false;
            }
            View view = this.ch.getView();
            if (view == null) {
                return false;
            }
            ViewId vid = view.getVid();
            if (vid == null) {
                return false;
            }
            Address coord = vid.getCoordAddress();
            if (coord == null) {
                return false;
            }
            return local_addr.equals(coord);
        }

        public final void setStateSize(int stateSize) {
            this.stateSize = stateSize;
        }

        public void run() {
            Runnable r = new Runnable(this){
                private final /* synthetic */ GroupMember this$0;
                {
                    this.this$0 = this$0;
                }

                public void run() {
                    try {
                        if (GroupMember.access$000(this.this$0).nextBoolean()) {
                            this.this$0.ch.getState(null, 5000L);
                        } else {
                            String randomStateId = Long.toString(Math.abs(GroupMember.access$000(this.this$0).nextLong()), 36);
                            this.this$0.ch.getState(null, randomStateId, 5000L);
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            if (this.usePullMode) {
                new Thread(r).start();
            } else {
                r.run();
            }
            while (this.running) {
                Object msgReceived = null;
                try {
                    Object evt;
                    msgReceived = this.ch.receive(0L);
                    if (msgReceived instanceof BlockEvent) {
                        this.block();
                        this.ch.blockOk();
                    } else if (msgReceived instanceof UnblockEvent) {
                        this.unblock();
                    }
                    if (!this.running || msgReceived instanceof View) continue;
                    if (msgReceived instanceof StreamingGetStateEvent) {
                        evt = (StreamingGetStateEvent)msgReceived;
                        if (((StreamingGetStateEvent)evt).getStateId() != null) {
                            this.getState(((StreamingGetStateEvent)evt).getStateId(), ((StreamingGetStateEvent)evt).getArg());
                            continue;
                        }
                        this.getState(((StreamingGetStateEvent)evt).getArg());
                        continue;
                    }
                    if (!(msgReceived instanceof StreamingSetStateEvent)) continue;
                    evt = (StreamingSetStateEvent)msgReceived;
                    if (((StreamingSetStateEvent)evt).getStateId() != null) {
                        this.setState(((StreamingSetStateEvent)evt).getStateId(), ((StreamingSetStateEvent)evt).getArg());
                        continue;
                    }
                    this.setState(((StreamingSetStateEvent)evt).getArg());
                }
                catch (TimeoutException e) {
                }
                catch (Exception e) {
                    this.ch.close();
                    this.running = false;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        public void getState(OutputStream ostream) {
            block9: {
                InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream("org/jgroups/JChannel.class");
                System.out.println(Thread.currentThread() + " at " + this.getAddress() + " is sending state of " + this.stateSize / 0x100000 + " MB");
                int markSize = 102400;
                byte[] buffer = new byte[this.bufferSize];
                int bytesRead = -1;
                for (int size = this.stateSize; size > 0; size -= bytesRead) {
                    stream.mark(markSize);
                    bytesRead = stream.read(buffer);
                    ostream.write(buffer);
                    stream.reset();
                }
                Object var9_7 = null;
                try {
                    ostream.flush();
                    ostream.close();
                }
                catch (IOException e2) {
                    e2.printStackTrace();
                }
                break block9;
                {
                    catch (IOException e) {
                        e.printStackTrace();
                        Object var9_8 = null;
                        try {
                            ostream.flush();
                            ostream.close();
                        }
                        catch (IOException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
                catch (Throwable throwable) {
                    Object var9_9 = null;
                    try {
                        ostream.flush();
                        ostream.close();
                    }
                    catch (IOException e2) {
                        e2.printStackTrace();
                    }
                    throw throwable;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setState(InputStream istream) {
            int totalRead = 0;
            byte[] buffer = new byte[this.bufferSize];
            int bytesRead = -1;
            long start = System.currentTimeMillis();
            try {
                while ((bytesRead = istream.read(buffer)) >= 0) {
                    totalRead += bytesRead;
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            finally {
                try {
                    istream.close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
            long readingTime = System.currentTimeMillis() - start;
            System.out.println(Thread.currentThread() + " at " + this.getAddress() + " read state of " + totalRead / 0x100000 + " MB in " + readingTime + " msec");
        }

        public void receive(Message msg) {
        }

        public void setState(byte[] state) {
        }

        public void viewAccepted(View new_view) {
        }

        public void suspect(Address suspected_mbr) {
        }

        public void block() {
        }

        public void unblock() {
        }

        public byte[] getState() {
            return null;
        }

        public byte[] getState(String state_id) {
            return null;
        }

        public void setState(String state_id, byte[] state) {
        }

        public void getState(String state_id, OutputStream ostream) {
            System.out.println("Writing partial streaming state transfer for " + state_id);
            this.getState(ostream);
        }

        public void setState(String state_id, InputStream istream) {
            System.out.println("Reading partial streaming state transfer for " + state_id);
            this.setState(istream);
        }

        static /* synthetic */ Random access$000(GroupMember x0) {
            return x0.ran;
        }
    }
}

