[sip-comm-dev] Re: svn commit: r7096 - trunk/src/net/java/sip/communicator/impl/neomedia: . codec/video/h264


#1

Hi Lubo,

just some ideas to you latest modifications.

wouldn't it make sense to use a standard implementation for a blocking
queue? For example "java.util.concurrent.LinkedBlockingQueue<E>" . This would
save some code.

The Java concurrent package also has a a DelayQueue that manages time based
queues, similar to you time constraint management. The DelayQueue has no
capacity management thus I would you a Java Concurrent counter Semaphore to
manage the capacity (decreasing the sempahore permit counter when inserting
a packet to a DelayQueue, increasing the permit counter when removing a
packet)

IMHO using these classes could save some code and at the same time using
optimized concurrent implementations.

BTW, if you are too busy I can give it a try :wink: and you may check it before
we actually check it in.

Regards,
Werner

···

Am 07.05.2010 17:33, schrieb lubomir_m@dev.java.net:

Author: lubomir_m
Date: 2010-05-07 15:33:40+0000
New Revision: 7096

Modified:
   trunk/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
   trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
   trunk/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
   trunk/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java

Log:
Allows controlling the number of RTP packets sent per millisecond in the video stream of a call.

Modified: trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
Url: https://sip-communicator.dev.java.net/source/browse/sip-communicator/trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java?view=diff&rev=7096&p1=trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java&p2=trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java&r1=7095&r2=7096

--- trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java (original)
+++ trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java 2010-05-07 15:33:40+0000
@@ -13,12 +13,33 @@
import javax.media.rtp.*;
  
/**
+ *
  * @author Bing SU (nova.su@gmail.com)
  * @author Lubomir Marinov
  */
public class RTPConnectorOutputStream
     implements OutputDataStream
{
+
+ /**
+ * The maximum number of packets to be sent to be kept in the queue of
+ * <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next
+ * attempt to write a new packet in the queue will block until at least one
+ * packet from the queue is sent. Defined in order to prevent
+ * <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity
+ * of the queue is unlimited.
+ */
+ private static final int
+ MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
+ = 256;
+
+ /**
+ * The functionality which allows this <tt>OutputDataStream</tt> to control
+ * how many RTP packets it sends through its <tt>DatagramSocket</tt> per a
+ * specific number of milliseconds.
+ */
+ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
+
     /**
      * UDP socket used to send packet data
      */
@@ -77,8 +98,8 @@
      *
      * @param remoteAddr target ip address
      * @param remotePort target port
- * @return true if the target is in stream target list and can be removed
- * false if not
+ * @return <tt>true</tt> if the target is in stream target list and can be
+ * removed; <tt>false</tt>, otherwise
      */
     public boolean removeTarget(InetAddress remoteAddr, int remotePort)
     {
@@ -105,37 +126,321 @@
         targets.clear();
     }
  
- /*
- * Implements OutputDataStream#write(byte[], int, int).
+ /**
+ * Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ *
+ * @param packet the RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>
+ * @return <tt>true</tt> if the specified <tt>packet</tt> was successfully
+ * sent; otherwise, <tt>false</tt>
      */
- public int write(byte[] buffer, int offset, int length)
+ private boolean send(RawPacket packet)
     {
- RawPacket pkt = createRawPacket(buffer, offset, length);
-
- /*
- * If we got extended, the delivery of the packet may have been
- * canceled.
- */
- if (pkt == null)
- return length;
-
         for (InetSocketAddress target : targets)
+ {
             try
             {
- socket
- .send(
+ socket.send(
                         new DatagramPacket(
- pkt.getBuffer(),
- pkt.getOffset(),
- pkt.getLength(),
+ packet.getBuffer(),
+ packet.getOffset(),
+ packet.getLength(),
                                 target.getAddress(),
                                 target.getPort()));
             }
             catch (IOException ex)
             {
                 // TODO error handling
- return -1;
+ return false;
             }
+ }
+ return true;
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the
+ * specified number of milliseconds; <tt>-1</tt> if no maximum is to be set
+ * @param perMillis the number of milliseconds per which <tt>maxPackets</tt>
+ * are to be sent by this <tt>OutputDataStream</tt> through its
+ * <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (maxPackets > 0)
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ maxPacketsPerMillisPolicy
+ = new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
+ }
+ }
+ else
+ {
+ maxPacketsPerMillisPolicy
+ .setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+ }
+
+ /**
+ * Implements {@link OutputDataStream#write(byte[], int, int)}.
+ *
+ * @param buffer
+ * @param offset
+ * @param length
+ * @return
+ */
+ public int write(byte[] buffer, int offset, int length)
+ {
+ if (maxPacketsPerMillisPolicy != null)
+ {
+ byte[] newBuffer = new byte[length];
+
+ System.arraycopy(buffer, offset, newBuffer, 0, length);
+ buffer = newBuffer;
+ offset = 0;
+ }
+
+ RawPacket packet = createRawPacket(buffer, offset, length);
+
+ /*
+ * If we got extended, the delivery of the packet may have been
+ * canceled.
+ */
+ if (packet != null)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (!send(packet))
+ return -1;
+ }
+ else
+ maxPacketsPerMillisPolicy.write(packet);
+ }
         return length;
     }
+
+ /**
+ * Implements the functionality which allows this <tt>OutputDataStream</tt>
+ * to control how many RTP packets it sends through its
+ * <tt>DatagramSocket</tt> per a specific number of milliseconds.
+ */
+ private class MaxPacketsPerMillisPolicy
+ {
+
+ /**
+ * The maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * {@link #perMillis} milliseconds.
+ */
+ private int maxPackets = -1;
+
+ /**
+ * The time stamp in nanoseconds of the start of the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long millisStartTime = 0;
+
+ /**
+ * The list of RTP packets to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private final List<RawPacket> packetQueue
+ = new LinkedList<RawPacket>();
+
+ /**
+ * The number of RTP packets already sent during the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long packetsSentInMillis = 0;
+
+ /**
+ * The time interval in nanoseconds during which {@link #maxPackets}
+ * number of RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private long perNanos = -1;
+
+ /**
+ * The <tt>Thread</tt> which is to send the RTP packets in
+ * {@link #packetQueue} through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ */
+ private Thread sendThread;
+
+ /**
+ * Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
+ * is to control how many RTP packets this <tt>OutputDataSource</tt> is
+ * to send through its <tt>DatagramSocket</tt> per a specific number of
+ * milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent per
+ * <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt>
+ * of this <tt>OutputDataStream</tt>
+ * @param perMillis the number of milliseconds per which a maximum of
+ * <tt>maxPackets</tt> RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
+ {
+ setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+
+ /**
+ * Sends the RTP packets in {@link #packetQueue} in accord with
+ * {@link #maxPackets} and {@link #perMillis}.
+ */
+ private void runInSendThread()
+ {
+ try
+ {
+ while (true)
+ {
+ RawPacket packet;
+
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size() < 1)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packet = packetQueue.remove(0);
+ packetQueue.notifyAll();
+ }
+
+ long time = System.nanoTime();
+ long millisRemainingTime = time - millisStartTime;
+
+ if ((perNanos < 1)
+ || (millisRemainingTime >= perNanos))
+ {
+ millisStartTime = time;
+ packetsSentInMillis = 0;
+ }
+ else if ((maxPackets > 0)
+ && (packetsSentInMillis >= maxPackets))
+ {
+ while (true)
+ {
+ millisRemainingTime
+ = System.nanoTime() - millisStartTime;
+ if (millisRemainingTime >= perNanos)
+ break;
+ try
+ {
+ Thread.sleep(
+ millisRemainingTime / 1000000,
+ (int) (millisRemainingTime % 1000000));
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ millisStartTime = System.nanoTime();
+ packetsSentInMillis = 0;
+ }
+
+ send(packet);
+ packetsSentInMillis++;
+ }
+ }
+ finally
+ {
+ synchronized (packetQueue)
+ {
+ if (Thread.currentThread().equals(sendThread))
+ sendThread = null;
+ }
+ }
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by
+ * this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt>
+ * per the specified number of milliseconds; <tt>-1</tt> if no maximum
+ * is to be set
+ * @param perMillis the number of milliseconds per which
+ * <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt>
+ * through its <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPackets < 1)
+ {
+ this.maxPackets = -1;
+ this.perNanos = -1;
+ }
+ else
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ this.maxPackets = maxPackets;
+ this.perNanos = perMillis * 1000000;
+ }
+ }
+
+ /**
+ * Queues a specific RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>.
+ *
+ * @param packet the RTP packet to be queued for sending through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public void write(RawPacket packet)
+ {
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size()
+ >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packetQueue.add(packet);
+
+ if (sendThread == null)
+ {
+ sendThread
+ = new Thread(getClass().getName())
+ {
+ @Override
+ public void run()
+ {
+ runInSendThread();
+ }
+ };
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+
+ packetQueue.notifyAll();
+ }
+ }
+ }
}

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net


#2

Hi Werner,

Please feel free to apply and commit modifications to the current
implementation as you see fit!

I'd like to only note that we're still experimenting with the whole
idea of "pacing" packets per a given amount of time and it may undergo
changes (e.g. switching it to force a specific bit rate instead).

As to the list/queue and the packet buffer copying on #write, I
thought it could turn out to be desirable to switch them to a strategy
which produces less garbage.

Best regards,
Lubo

···

On Sat, May 8, 2010 at 11:38 AM, Werner Dittmann <Werner.Dittmann@t-online.de> wrote:

Hi Lubo,

just some ideas to you latest modifications.

wouldn't it make sense to use a standard implementation for a blocking
queue? For example "java.util.concurrent.LinkedBlockingQueue<E>" . This would
save some code.

The Java concurrent package also has a a DelayQueue that manages time based
queues, similar to you time constraint management. The DelayQueue has no
capacity management thus I would you a Java Concurrent counter Semaphore to
manage the capacity (decreasing the sempahore permit counter when inserting
a packet to a DelayQueue, increasing the permit counter when removing a
packet)

IMHO using these classes could save some code and at the same time using
optimized concurrent implementations.

BTW, if you are too busy I can give it a try :wink: and you may check it before
we actually check it in.

Regards,
Werner

Am 07.05.2010 17:33, schrieb lubomir_m@dev.java.net:

Author: lubomir_m
Date: 2010-05-07 15:33:40+0000
New Revision: 7096

Modified:
trunk/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
trunk/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
trunk/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java

Log:
Allows controlling the number of RTP packets sent per millisecond in the video stream of a call.

Modified: trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
Url: https://sip-communicator.dev.java.net/source/browse/sip-communicator/trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java?view=diff&rev=7096&p1=trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java&p2=trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java&r1=7095&r2=7096

--- trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java (original)
+++ trunk/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java 2010-05-07 15:33:40+0000
@@ -13,12 +13,33 @@
import javax.media.rtp.*;

/**
+ *
* @author Bing SU (nova.su@gmail.com)
* @author Lubomir Marinov
*/
public class RTPConnectorOutputStream
implements OutputDataStream
{
+
+ /**
+ * The maximum number of packets to be sent to be kept in the queue of
+ * <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next
+ * attempt to write a new packet in the queue will block until at least one
+ * packet from the queue is sent. Defined in order to prevent
+ * <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity
+ * of the queue is unlimited.
+ */
+ private static final int
+ MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
+ = 256;
+
+ /**
+ * The functionality which allows this <tt>OutputDataStream</tt> to control
+ * how many RTP packets it sends through its <tt>DatagramSocket</tt> per a
+ * specific number of milliseconds.
+ */
+ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
+
/**
* UDP socket used to send packet data
*/
@@ -77,8 +98,8 @@
*
* @param remoteAddr target ip address
* @param remotePort target port
- * @return true if the target is in stream target list and can be removed
- * false if not
+ * @return <tt>true</tt> if the target is in stream target list and can be
+ * removed; <tt>false</tt>, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
@@ -105,37 +126,321 @@
targets.clear();
}

- /*
- * Implements OutputDataStream#write(byte[], int, int).
+ /**
+ * Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ *
+ * @param packet the RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>
+ * @return <tt>true</tt> if the specified <tt>packet</tt> was successfully
+ * sent; otherwise, <tt>false</tt>
*/
- public int write(byte[] buffer, int offset, int length)
+ private boolean send(RawPacket packet)
{
- RawPacket pkt = createRawPacket(buffer, offset, length);
-
- /*
- * If we got extended, the delivery of the packet may have been
- * canceled.
- */
- if (pkt == null)
- return length;
-
for (InetSocketAddress target : targets)
+ {
try
{
- socket
- .send(
+ socket.send(
new DatagramPacket(
- pkt.getBuffer(),
- pkt.getOffset(),
- pkt.getLength(),
+ packet.getBuffer(),
+ packet.getOffset(),
+ packet.getLength(),
target.getAddress(),
target.getPort()));
}
catch (IOException ex)
{
// TODO error handling
- return -1;
+ return false;
}
+ }
+ return true;
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the
+ * specified number of milliseconds; <tt>-1</tt> if no maximum is to be set
+ * @param perMillis the number of milliseconds per which <tt>maxPackets</tt>
+ * are to be sent by this <tt>OutputDataStream</tt> through its
+ * <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (maxPackets > 0)
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ maxPacketsPerMillisPolicy
+ = new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
+ }
+ }
+ else
+ {
+ maxPacketsPerMillisPolicy
+ .setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+ }
+
+ /**
+ * Implements {@link OutputDataStream#write(byte[], int, int)}.
+ *
+ * @param buffer
+ * @param offset
+ * @param length
+ * @return
+ */
+ public int write(byte[] buffer, int offset, int length)
+ {
+ if (maxPacketsPerMillisPolicy != null)
+ {
+ byte[] newBuffer = new byte[length];
+
+ System.arraycopy(buffer, offset, newBuffer, 0, length);
+ buffer = newBuffer;
+ offset = 0;
+ }
+
+ RawPacket packet = createRawPacket(buffer, offset, length);
+
+ /*
+ * If we got extended, the delivery of the packet may have been
+ * canceled.
+ */
+ if (packet != null)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (!send(packet))
+ return -1;
+ }
+ else
+ maxPacketsPerMillisPolicy.write(packet);
+ }
return length;
}
+
+ /**
+ * Implements the functionality which allows this <tt>OutputDataStream</tt>
+ * to control how many RTP packets it sends through its
+ * <tt>DatagramSocket</tt> per a specific number of milliseconds.
+ */
+ private class MaxPacketsPerMillisPolicy
+ {
+
+ /**
+ * The maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * {@link #perMillis} milliseconds.
+ */
+ private int maxPackets = -1;
+
+ /**
+ * The time stamp in nanoseconds of the start of the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long millisStartTime = 0;
+
+ /**
+ * The list of RTP packets to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private final List<RawPacket> packetQueue
+ = new LinkedList<RawPacket>();
+
+ /**
+ * The number of RTP packets already sent during the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long packetsSentInMillis = 0;
+
+ /**
+ * The time interval in nanoseconds during which {@link #maxPackets}
+ * number of RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private long perNanos = -1;
+
+ /**
+ * The <tt>Thread</tt> which is to send the RTP packets in
+ * {@link #packetQueue} through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ */
+ private Thread sendThread;
+
+ /**
+ * Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
+ * is to control how many RTP packets this <tt>OutputDataSource</tt> is
+ * to send through its <tt>DatagramSocket</tt> per a specific number of
+ * milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent per
+ * <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt>
+ * of this <tt>OutputDataStream</tt>
+ * @param perMillis the number of milliseconds per which a maximum of
+ * <tt>maxPackets</tt> RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
+ {
+ setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+
+ /**
+ * Sends the RTP packets in {@link #packetQueue} in accord with
+ * {@link #maxPackets} and {@link #perMillis}.
+ */
+ private void runInSendThread()
+ {
+ try
+ {
+ while (true)
+ {
+ RawPacket packet;
+
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size() < 1)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packet = packetQueue.remove(0);
+ packetQueue.notifyAll();
+ }
+
+ long time = System.nanoTime();
+ long millisRemainingTime = time - millisStartTime;
+
+ if ((perNanos < 1)
+ || (millisRemainingTime >= perNanos))
+ {
+ millisStartTime = time;
+ packetsSentInMillis = 0;
+ }
+ else if ((maxPackets > 0)
+ && (packetsSentInMillis >= maxPackets))
+ {
+ while (true)
+ {
+ millisRemainingTime
+ = System.nanoTime() - millisStartTime;
+ if (millisRemainingTime >= perNanos)
+ break;
+ try
+ {
+ Thread.sleep(
+ millisRemainingTime / 1000000,
+ (int) (millisRemainingTime % 1000000));
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ millisStartTime = System.nanoTime();
+ packetsSentInMillis = 0;
+ }
+
+ send(packet);
+ packetsSentInMillis++;
+ }
+ }
+ finally
+ {
+ synchronized (packetQueue)
+ {
+ if (Thread.currentThread().equals(sendThread))
+ sendThread = null;
+ }
+ }
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by
+ * this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt>
+ * per the specified number of milliseconds; <tt>-1</tt> if no maximum
+ * is to be set
+ * @param perMillis the number of milliseconds per which
+ * <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt>
+ * through its <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPackets < 1)
+ {
+ this.maxPackets = -1;
+ this.perNanos = -1;
+ }
+ else
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ this.maxPackets = maxPackets;
+ this.perNanos = perMillis * 1000000;
+ }
+ }
+
+ /**
+ * Queues a specific RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>.
+ *
+ * @param packet the RTP packet to be queued for sending through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public void write(RawPacket packet)
+ {
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size()
+ >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packetQueue.add(packet);
+
+ if (sendThread == null)
+ {
+ sendThread
+ = new Thread(getClass().getName())
+ {
+ @Override
+ public void run()
+ {
+ runInSendThread();
+ }
+ };
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+
+ packetQueue.notifyAll();
+ }
+ }
+ }
}

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net


#3

Lubo,

I'll test some ways to simplify the code using classes from the
concurrent package

Just one question: which part of the code will stop the thread that sends
the packets in the time controlled loop? I don't see any exit condition
for this output connector or a close() call.

Regards,
Werner

···

Am 08.05.2010 23:45, schrieb Lubomir Marinov:

Hi Werner,

Please feel free to apply and commit modifications to the current
implementation as you see fit!

I'd like to only note that we're still experimenting with the whole
idea of "pacing" packets per a given amount of time and it may undergo
changes (e.g. switching it to force a specific bit rate instead).

As to the list/queue and the packet buffer copying on #write, I
thought it could turn out to be desirable to switch them to a strategy
which produces less garbage.

Best regards,
Lubo

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net


#4

Hi Werner,

I didn't see an exit condition or a close() call. I guess the thread
could, for example, die out by itself after some time of inactivity.

Best regards,
Lubo

···

On Mon, May 10, 2010 at 7:59 PM, Werner Dittmann <Werner.Dittmann@t-online.de> wrote:

Lubo,

I'll test some ways to simplify the code using classes from the
concurrent package

Just one question: which part of the code will stop the thread that sends
the packets in the time controlled loop? I don't see any exit condition
for this output connector or a close() call.

Regards,
Werner

Am 08.05.2010 23:45, schrieb Lubomir Marinov:

Hi Werner,

Please feel free to apply and commit modifications to the current
implementation as you see fit!

I'd like to only note that we're still experimenting with the whole
idea of "pacing" packets per a given amount of time and it may undergo
changes (e.g. switching it to force a specific bit rate instead).

As to the list/queue and the packet buffer copying on #write, I
thought it could turn out to be desirable to switch them to a strategy
which produces less garbage.

Best regards,
Lubo

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net


#5

Hi Lubo,

Hi Werner,

I didn't see an exit condition or a close() call.

As said: I don't see it either :slight_smile:

I guess the thread

could, for example, die out by itself after some time of inactivity.

Hmmmm... not a good idea IMHO. Adding a close() call would be better
and call this from RTPConnectorImpl when it gets closed. WDYT?

Regards,
Werner

···

Am 10.05.2010 19:40, schrieb Lubomir Marinov:

Best regards,
Lubo

On Mon, May 10, 2010 at 7:59 PM, Werner Dittmann > <Werner.Dittmann@t-online.de> wrote:

Lubo,

I'll test some ways to simplify the code using classes from the
concurrent package

Just one question: which part of the code will stop the thread that sends
the packets in the time controlled loop? I don't see any exit condition
for this output connector or a close() call.

Regards,
Werner

Am 08.05.2010 23:45, schrieb Lubomir Marinov:

Hi Werner,

Please feel free to apply and commit modifications to the current
implementation as you see fit!

I'd like to only note that we're still experimenting with the whole
idea of "pacing" packets per a given amount of time and it may undergo
changes (e.g. switching it to force a specific bit rate instead).

As to the list/queue and the packet buffer copying on #write, I
thought it could turn out to be desirable to switch them to a strategy
which produces less garbage.

Best regards,
Lubo

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net


#6

Great!

···

On Mon, May 10, 2010 at 9:18 PM, Werner Dittmann <Werner.Dittmann@t-online.de> wrote:

I guess the thread

could, for example, die out by itself after some time of inactivity.

Hmmmm... not a good idea IMHO. Adding a close() call would be better
and call this from RTPConnectorImpl when it gets closed. WDYT?

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@sip-communicator.dev.java.net
For additional commands, e-mail: dev-help@sip-communicator.dev.java.net