Subscriber and publisher implementation with WCF

 19-Mar-2017   nityaprakash     WCF  DuplexBinding    Comments  0

This article explains how we can implement publisher and subscriber pattern in WCF. Why we need this pattern? Sometime, client send a request to server which invoke a process. This process is long running and client need to be updated with progress. If this progress is being recorded in database, than we can implement polling. We will make one request which will start processing of the request. This will be one way request, client won't expect any response back except that process started successfully or not. Than another request will be send to service to get the progress in some interval, until it stop getting the response.

Publisher/Subscriber pattern WCF

WCF provide different ways of communication between Client And Service. These communication pattern also called Message Patterns are as follows:

  1. Request and Response
  2. One Way
  3. Duplex Communication

Here we are going to discuss Duplex pattern. It allows two way communication between client and server.

How Does Duplex Service works.

Below image illustrate how communication between client and server works.

In Duplex pattern, both the client and the service can initiate communication. The client calls the method of the service. The service can then use a client callback to call a method in client.

To implement this pattern The Service has to expose two contract (Interface). One interface which will be implemented by Service for actual processing and client will call this service to perform that task. Another callback interface will be implement by the client and implement the client method which will be executed by service to report progress of the process. Client will create instanceContext with class which is implementing the Callback Interface and pass to the service. Service will access the reference with OperationContext and execute the method.

How to implement Subscriber and Publisher

We are going to create a simple demo. A PrepareService which expected to process some people Data and prepare for further processing. As it prepare each record, it notify the client so client can update the status on client side. We have created two interfaces IDataPrepareService and IDataPrepareServiceCallback as below:

IDataPrepareService


    [ServiceContract(CallbackContract =typeof(IDataPrepareServiceCallBack))]
    public interface IDataPrepareService
    {

        [OperationContract(IsOneWay = true)]
        void ProcessData(RecordData[] records);

    }
   

In this example, I have annotated the service with CallbackContract = typeof(IDataPrepareSericeCallBack)]. It tells the service, that this service is going to be used for Duplex Pattern. And Client has to implement this method if the want any message from service.

IDataPrepareServiceCallBack


    [ServiceContract]
    public interface IDataPrepareServiceCallBack
    {
        [OperationContract]
        void OnRecordProcessed(RecordData data);
    }
   

This interface has only one method, that will be called by the OperationContext.

PrepareDataService


    [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
    public class DataPrepareService : IDataPrepareService
    {
        public void ProcessData(RecordData[] records)
        {
            IDataPrepareServiceCallBack callback = OperationContext.Current.GetCallbackChannel();

            Task.Factory.StartNew(() =>
            {
                foreach (RecordData row in records)
                {
                    Thread.Sleep(1000);
                    
                    if( OperationContext.Current.Channel.State == CommunicationState.Opened)
                        callback.OnRecordProcessed(row);    
                }
            });
        }
    }
    

To simulate long running process, I have started process in separate task and used Thread.Sleep(1000), 1 second delay for processing each record. We are getting reference of Callback interface from OperationContext. I am checking if the connection to the client still open or not. If connection is open than execute the Callback method.

Note: There may be possibility that client is not maintaining the connection properly and channel object goes out of scope and closed by runtime. In this case service will throw exception and stop processing.

Implementing The Client

Client also plays major role here. Because, it has to manage and keep connection open until process completes to get notification from service. Once connection object destroys, it wont get any communication from service with respect to request sent previously.


[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
    public class ProductServiceCallbackClient : ProductSvc.IDataPrepareServiceCallback, IDisposable
    {
        DataPrepareServiceClient proxy = null;
         
        public void Dispose()
        {
            proxy.Close();
        }

        public void ProcessData()
        {
            InstanceContext context = new InstanceContext(this);

            proxy = new DataPrepareServiceClient(context);

            RecordData[] data1 = new RecordData[20];

            for(int i = 0; i < data1.Length; i++)
            {
                data1[i] = new RecordData
                {
                    FirstName = "First" + (i + 1).ToString(),
                    MiddleName = "Middle" + (i + 1).ToString(),
                    LastName = "Last" + (i + 1).ToString()
                };

            }

            RecordData[] data2= new RecordData[20];

            for (int i = 0; i < data2.Length; i++)
            {
                data2[i] = new RecordData
                {
                    FirstName = "AAA" + (i + 1).ToString(),
                    MiddleName = "BBB" + (i + 1).ToString(),
                    LastName = "CCC" + (i + 1).ToString()
                };

            }

            proxy.ProcessData(data1);
            Console.WriteLine("Data 1 Started");
            Thread.Sleep(5000);
            proxy.ProcessData(data2);
            
            Console.WriteLine("Data 2 Started");
        }

        public void OnRecordProcessed(RecordData data)
        {
            Console.WriteLine("Processed : {0} {1} {2}", data.FirstName, data.MiddleName, data.LastName);
        }


    }
}
   

I created ProductServiceCallbackClient class which implement IDataPrepareServiceCallback interface. Create object of System.ServiceModel.InstanceContext with current object. Pass this object to DataPrepareServiceClient service proxy object. ProductServiceCallbackClient implements the OnRecordProcessed method from the Callback interface. This method will be called by the service, whenever service has to send notification to client.

Binding

We need wsDualHttpBinding to work over HTTP protocol. We need WCF at both side, client and Service. It doesn't working basicHttpBinding, wsHttpBinding. It also works with netTcpBinding and


    <services>
      <service name="ProductService.DataPrepareService">
        <clear />
        <endpoint address="" binding="wsDualHttpBinding" bindingConfiguration=""
          contract="ProductService.IDataPrepareService" />
      </service>
    </services>
    <behaviors>
      <serviceBehaviors>
        <behavior>
          <!-- To avoid disclosing metadata information, set the values below to false before deployment -->
          <serviceMetadata httpGetEnabled="true" httpsGetEnabled="true"/>
          <!-- To receive exception details in faults for debugging purposes, set the value below to true.  Set to false before deployment to avoid disclosing exception information -->
          <serviceDebug includeExceptionDetailInFaults="false"/>
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <protocolMapping>
        <add binding="basicHttpsBinding" scheme="https"/>
    </protocolMapping> 
       

When we create proxy with Visual Studio tools, it will create all configuration and necessary proxy classes automatically in Client Side.

Concurrency and Reentrant

If you have seen, we have decorated ProductServiceCallbackClient and 'DataPrepareServicewith 'CallbackBehavior' and 'ServiceBehavior attribute respectively with 'ConcurrencyMode = ConcurrencyMode.Reentrant'. Why do we need to do this? In this example we are just showing the status and we have one OperationContract and one Callback method defined in this demo. But there might be the case when we have multiple OperationContract and Callback defined. When OperationContract call the Callback method at client side, it may call another method of service on same channel which may be again long process. If this happen, and service and client calls may end up in deadlock situation. To avoid this case we decorate client and service both with ConcurrencyMode.Reentrant.

Things to Remember

  1. Duplex messaging only possible with wsDualHtttpBinding over HTTP and also possible with netTcpBinding and netPeerTcpBinding by default.
  2. Need to have WCF both client and Server side.
  3. Service must Define Callback Contract and Client must implement it.
  4. Client must call service with InstanceContext to establish two communication.
  5. Client should keep connection open during the process to be communicated by service for the progress. If connection dropped, than service may also throw exception and client will not receive communication.
  6. Both client and Service classes should be decorated with ConcurrencyMode.Reenterant.

Code related to this service can be found on GitHub.


Download Code
Nitya Prakash Sharma has over 10 years of experience in .NET technology. He is currently working as Senior Consultant in industry. He is always keen to learn new things in Technology and eager to apply wherever is possible. He is also has interest in Photography, sketching and painting.

My Blog
Post Comment

COMMENTS