From 5803aadd3f209eba1ffbb2cf7bf16778019dbee1 Mon Sep 17 00:00:00 2001 From: fredoboulo <fredoboulo@users.noreply.github.com> Date: Fri, 23 Feb 2018 23:55:57 +0100 Subject: [PATCH] Fix #524 : V1 and V2 protocol downgrades handle received data in handshake buffer This patch is upstream pull request, see: https://gihub.com/zeromq/jeromq/pull/527. It is merged on commit c2afa9c, and we can drop it on the 0.4.4 release. --- src/main/java/zmq/io/StreamEngine.java | 21 ++++++++++-- src/test/java/zmq/io/AbstractProtocolVersion.java | 41 +++++++++++++---------- src/test/java/zmq/io/V0ProtocolTest.java | 12 +++++++ src/test/java/zmq/io/V1ProtocolTest.java | 16 +++++++-- src/test/java/zmq/io/V2ProtocolTest.java | 16 +++++++-- 5 files changed, 81 insertions(+), 25 deletions(-) diff --git a/src/main/java/zmq/io/StreamEngine.java b/src/main/java/zmq/io/StreamEngine.java index b8933c92..fe2f2d8d 100644 --- a/src/main/java/zmq/io/StreamEngine.java +++ b/src/main/java/zmq/io/StreamEngine.java @@ -816,9 +816,7 @@ private boolean handshake() assert (bufferSize == headerSize); // Make sure the decoder sees the data we have already received. - greetingRecv.flip(); - inpos = greetingRecv; - insize = greetingRecv.limit(); + decodeDataAfterHandshake(0); // To allow for interoperability with peers that do not forward // their subscriptions, we inject a phantom subscription message @@ -846,6 +844,8 @@ else if (greetingRecv.get(revisionPos) == Protocol.V1.revision) { } encoder = new V1Encoder(errno, Config.OUT_BATCH_SIZE.getValue()); decoder = new V1Decoder(errno, Config.IN_BATCH_SIZE.getValue(), options.maxMsgSize, options.allocator); + + decodeDataAfterHandshake(V2_GREETING_SIZE); } else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) { // ZMTP/2.0 framing. @@ -859,6 +859,8 @@ else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) { } encoder = new V2Encoder(errno, Config.OUT_BATCH_SIZE.getValue()); decoder = new V2Decoder(errno, Config.IN_BATCH_SIZE.getValue(), options.maxMsgSize, options.allocator); + + decodeDataAfterHandshake(V2_GREETING_SIZE); } else { zmtpVersion = Protocol.V3; @@ -904,6 +906,19 @@ else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) { return true; } + private void decodeDataAfterHandshake(int greetingSize) + { + final int pos = greetingRecv.position(); + if (pos > greetingSize) { + // data is present after handshake + greetingRecv.position(greetingSize).limit(pos); + + // Make sure the decoder sees this extra data. + inpos = greetingRecv; + insize = greetingRecv.remaining(); + } + } + private Msg identityMsg() { Msg msg = new Msg(options.identitySize); diff --git a/src/test/java/zmq/io/AbstractProtocolVersion.java b/src/test/java/zmq/io/AbstractProtocolVersion.java index e60db403..aa06b4a7 100644 --- a/src/test/java/zmq/io/AbstractProtocolVersion.java +++ b/src/test/java/zmq/io/AbstractProtocolVersion.java @@ -18,15 +18,18 @@ import zmq.SocketBase; import zmq.ZError; import zmq.ZMQ; +import zmq.ZMQ.Event; import zmq.util.Utils; public abstract class AbstractProtocolVersion { + protected static final int REPETITIONS = 1000; + static class SocketMonitor extends Thread { - private final Ctx ctx; - private final String monitorAddr; - private final List<ZMQ.Event> events = new ArrayList<>(); + private final Ctx ctx; + private final String monitorAddr; + private final ZMQ.Event[] events = new ZMQ.Event[1]; public SocketMonitor(Ctx ctx, String monitorAddr) { @@ -41,15 +44,15 @@ public void run() boolean rc = s.connect(monitorAddr); assertThat(rc, is(true)); // Only some of the exceptional events could fire - while (true) { - ZMQ.Event event = ZMQ.Event.read(s); - if (event == null && s.errno() == ZError.ETERM) { - break; - } - assertThat(event, notNullValue()); - - events.add(event); + + ZMQ.Event event = ZMQ.Event.read(s); + if (event == null && s.errno() == ZError.ETERM) { + s.close(); + return; } + assertThat(event, notNullValue()); + + events[0] = event; s.close(); } } @@ -69,11 +72,12 @@ public void run() boolean rc = ZMQ.setSocketOption(receiver, ZMQ.ZMQ_LINGER, 0); assertThat(rc, is(true)); - SocketMonitor monitor = new SocketMonitor(ctx, "inproc://monitor"); - monitor.start(); rc = ZMQ.monitorSocket(receiver, "inproc://monitor", ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL); assertThat(rc, is(true)); + SocketMonitor monitor = new SocketMonitor(ctx, "inproc://monitor"); + monitor.start(); + rc = ZMQ.bind(receiver, host); assertThat(rc, is(true)); @@ -81,17 +85,18 @@ public void run() OutputStream out = sender.getOutputStream(); for (ByteBuffer raw : raws) { out.write(raw.array()); - ZMQ.msleep(100); } Msg msg = ZMQ.recv(receiver, 0); assertThat(msg, notNullValue()); assertThat(new String(msg.data(), ZMQ.CHARSET), is(payload)); - ZMQ.msleep(500); - assertThat(monitor.events.size(), is(1)); - assertThat(monitor.events.get(0).event, is(ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL)); - assertThat((Integer) monitor.events.get(0).arg, is(version)); + monitor.join(); + + final Event event = monitor.events[0]; + assertThat(event, notNullValue()); + assertThat(event.event, is(ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL)); + assertThat((Integer) event.arg, is(version)); InputStream in = sender.getInputStream(); byte[] data = new byte[255]; diff --git a/src/test/java/zmq/io/V0ProtocolTest.java b/src/test/java/zmq/io/V0ProtocolTest.java index bd547d23..1a5b7aef 100644 --- a/src/test/java/zmq/io/V0ProtocolTest.java +++ b/src/test/java/zmq/io/V0ProtocolTest.java @@ -10,6 +10,18 @@ public class V0ProtocolTest extends AbstractProtocolVersion { + @Test + public void testFixIssue524() throws IOException, InterruptedException + { + for (int idx = 0; idx < REPETITIONS; ++idx) { + if (idx % 100 == 0) { + System.out.print(idx + " "); + } + testProtocolVersion0short(); + } + System.out.println(); + } + @Test(timeout = 2000) public void testProtocolVersion0short() throws IOException, InterruptedException { diff --git a/src/test/java/zmq/io/V1ProtocolTest.java b/src/test/java/zmq/io/V1ProtocolTest.java index e1045f34..764159d0 100644 --- a/src/test/java/zmq/io/V1ProtocolTest.java +++ b/src/test/java/zmq/io/V1ProtocolTest.java @@ -10,7 +10,19 @@ public class V1ProtocolTest extends AbstractProtocolVersion { - @Test(timeout = 2000) + @Test + public void testFixIssue524() throws IOException, InterruptedException + { + for (int idx = 0; idx < REPETITIONS; ++idx) { + if (idx % 100 == 0) { + System.out.print(idx + " "); + } + testProtocolVersion1short(); + } + System.out.println(); + } + + @Test public void testProtocolVersion1short() throws IOException, InterruptedException { List<ByteBuffer> raws = raws(0); @@ -25,7 +37,7 @@ public void testProtocolVersion1short() throws IOException, InterruptedException assertProtocolVersion(1, raws, "abcdefg"); } - @Test(timeout = 2000) + @Test public void testProtocolVersion1long() throws IOException, InterruptedException { List<ByteBuffer> raws = raws(0); diff --git a/src/test/java/zmq/io/V2ProtocolTest.java b/src/test/java/zmq/io/V2ProtocolTest.java index d5e64bce..7fda31bc 100644 --- a/src/test/java/zmq/io/V2ProtocolTest.java +++ b/src/test/java/zmq/io/V2ProtocolTest.java @@ -21,7 +21,19 @@ protected ByteBuffer identity() .put((byte) 0); } - @Test(timeout = 2000) + @Test + public void testFixIssue524() throws IOException, InterruptedException + { + for (int idx = 0; idx < REPETITIONS; ++idx) { + if (idx % 100 == 0) { + System.out.print(idx + " "); + } + testProtocolVersion2short(); + } + System.out.println(); + } + + @Test public void testProtocolVersion2short() throws IOException, InterruptedException { List<ByteBuffer> raws = raws(1); @@ -38,7 +50,7 @@ public void testProtocolVersion2short() throws IOException, InterruptedException assertProtocolVersion(2, raws, "abcdefg"); } - @Test(timeout = 2000) + @Test public void testProtocolVersion2long() throws IOException, InterruptedException { List<ByteBuffer> raws = raws(1);