root/plugins/branches/0185_GEN_PHYSICS_REFACTOR_2/ZBoostNetworking/src/TransmissionControlProtocolService.cpp @ 3447

Revision 3447, 17.3 KB (checked in by mgray, 6 months ago)

Debug and refactor of the TCP Protocol Adapter in ZBoostNetworking.

Line 
1//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
2// Zen Enterprise Framework
3//
4// Copyright (C) 2001 - 2009 Tony Richards
5//
6//  This software is provided 'as-is', without any express or implied
7//  warranty.  In no event will the authors be held liable for any damages
8//  arising from the use of this software.
9//
10//  Permission is granted to anyone to use this software for any purpose,
11//  including commercial applications, and to alter it and redistribute it
12//  freely, subject to the following restrictions:
13//
14//  1. The origin of this software must not be misrepresented; you must not
15//     claim that you wrote the original software. If you use this software
16//     in a product, an acknowledgment in the product documentation would be
17//     appreciated but is not required.
18//  2. Altered source versions must be plainly marked as such, and must not be
19//     misrepresented as being the original software.
20//  3. This notice may not be removed or altered from any source distribution.
21//
22//  Tony Richards trichards@indiezen.com
23//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
24#ifndef NOMINMAX
25#define NOMINMAX
26#endif
27// This must be included first thanks to some Winblows crap.
28#include <boost/asio.hpp>
29
30#include "TransmissionControlProtocolService.hpp"
31
32#include "Endpoint.hpp"
33
34#include <Zen/Core/Utility/runtime_exception.hpp>
35
36#include <Zen/Core/Threading/ThreadFactory.hpp>
37#include <Zen/Core/Threading/MutexFactory.hpp>
38#include <Zen/Core/Threading/CriticalSection.hpp>
39
40#include <Zen/Core/Plugins/I_ConfigurationElement.hpp>
41
42#include <Zen/Core/Event/I_Event.hpp>
43#include <Zen/Core/Event/I_EventService.hpp>
44
45#include <Zen/Enterprise/AppServer/I_Message.hpp>
46#include <Zen/Enterprise/AppServer/I_MessageFactory.hpp>
47#include <Zen/Enterprise/AppServer/I_MessageType.hpp>
48#include <Zen/Enterprise/AppServer/I_MessageRegistry.hpp>
49#include <Zen/Enterprise/AppServer/I_MessageHeader.hpp>
50#include <Zen/Enterprise/AppServer/I_ApplicationServer.hpp>
51#include <Zen/Enterprise/AppServer/I_ApplicationServerManager.hpp>
52
53#include <boost/bind.hpp>
54
55#include <boost/archive/polymorphic_binary_oarchive.hpp>
56#include <boost/archive/polymorphic_binary_iarchive.hpp>
57
58#include <boost/iostreams/device/array.hpp>
59#include <boost/iostreams/stream.hpp>
60
61#include <iostream>
62
63#include <stdlib.h>
64#include <string.h>
65
66//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
67namespace Zen {
68namespace Enterprise {
69namespace AppServer {
70//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
71TransmissionControlProtocolService::TransmissionControlProtocolService(I_ApplicationServer& _server)
72:   m_appServer(_server)
73,   m_ioService()
74,   m_pWork(NULL)
75,   m_acceptor(m_ioService)
76,   m_address()
77,   m_port()
78,   m_threadCount(2)
79,   m_threadsStarted(false)
80,   m_pConnectionsGuard(Threading::MutexFactory::create())
81{
82}
83
84//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
85TransmissionControlProtocolService::~TransmissionControlProtocolService()
86{
87    Threading::MutexFactory::destroy(m_pConnectionsGuard);
88}
89
90//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
91I_ApplicationServer&
92TransmissionControlProtocolService::getApplicationServer()
93{
94    return m_appServer;
95}
96
97//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
98void
99TransmissionControlProtocolService::setConfiguration(const Plugins::I_ConfigurationElement& _config)
100{
101    m_address = _config.getAttribute("address");
102    m_port = _config.getAttribute("port");
103    m_threadCount = strtol(_config.getAttribute("threads").c_str(), NULL, 10);
104
105    if (m_address.empty() || m_port.empty())
106    {
107        m_isServer = false;
108    }
109    else
110    {
111        m_isServer = true;
112    }
113}
114
115//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
116TransmissionControlProtocolService::pEndpoint_type
117TransmissionControlProtocolService::resolveEndpoint(const std::string& _address, const std::string& _port)
118{
119    boost::asio::ip::tcp::resolver resolver(m_ioService);
120    // TODO _address and _port or v4() and m_port?
121    boost::asio::ip::tcp::resolver::query query(_address, _port);
122    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
123
124    pEndpoint_type pEndpoint(new Endpoint(getSelfReference(), endpoint), boost::bind(&TransmissionControlProtocolService::destroyEndpoint, this, _1));
125
126    // Default to true.  Generally an endpoint is outbound when resolveEndpoint()
127    // is called.  Since "listen()" is not a valid method (listen ports are
128    // determined by the configuration) then we probably aren't ever creating
129    // a non-local endpoint.
130    pEndpoint->setIsLocal(false);
131    return pEndpoint;
132}
133
134//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
135void
136TransmissionControlProtocolService::sendTo(pMessage_type _pMessage, pEndpoint_type _pEndpoint)
137{
138    // TODO Push onto a queue and handle in a worker thread?
139    {
140        typedef Memory::managed_ptr<Endpoint> pConcreteEndpoint_type;
141        pConcreteEndpoint_type pEndpoint(_pEndpoint.as<pConcreteEndpoint_type>());
142
143        pConnection_type pConnection;
144
145        // Find or create the connection.
146        {
147            Threading::CriticalSection lock(m_pConnectionsGuard);
148
149            // Find the connection associated with this endpoint.
150            ConnectionMap_type::iterator iter = m_connectionMap.find(pEndpoint->getEndpoint());
151
152            if (iter == m_connectionMap.end())
153            {
154                if (m_isServer)
155                {
156                    // TODO Error?
157                    return;
158                }
159
160                pConnection = m_pNewConnection;
161
162                createConnection();
163
164                // Assume this is an outbound endpoint.
165                pEndpoint->setIsLocal(false);
166
167                m_connectionMap[pEndpoint->getEndpoint()] = pConnection;
168                pConnection->connect(_pEndpoint);
169            }
170            else
171            {
172                pConnection = iter->second;
173            }
174        }
175
176        // TODO Create a task to handle this asynchronously?
177        std::stringstream buffer;
178
179        boost::archive::polymorphic_binary_oarchive archive(buffer,
180                                                            boost::archive::no_header |
181                                                            boost::archive::no_tracking);
182
183        // Serialize the header
184        // TODO Handle the version correctly
185        _pMessage->getMessageHeader()->serialize(archive, 0);
186
187        // Serialize the rest of the message
188        _pMessage->serialize(archive, 0);
189
190        pConnection->write(buffer.str().c_str(), (boost::uint32_t)buffer.str().length());
191
192    }
193
194    // Make sure the threads have been started.
195    startThreads();
196}
197
198//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
199Event::I_Event&
200TransmissionControlProtocolService::getConnectedEvent()
201{
202    return getApplicationServer().getEventService()->getEvent("TransmissionControlProtocolService/connectedEvent");
203}
204
205//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
206Event::I_Event&
207TransmissionControlProtocolService::getDisconnectedEvent()
208{
209    return getApplicationServer().getEventService()->getEvent("TransmissionControlProtocolService/disconnectedEvent");
210}
211
212//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
213void
214TransmissionControlProtocolService::destroyEndpoint(wpEndpoint_type _pEndpoint)
215{
216    Endpoint* pEndpoint = dynamic_cast<Endpoint*>(_pEndpoint.get());
217    if (pEndpoint != NULL)
218    {
219        delete pEndpoint;
220    }
221    else
222    {
223        // TODO Error
224    }
225}
226
227//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
228Threading::I_Condition*
229TransmissionControlProtocolService::prepareToStart(Threading::ThreadPool& _threadPool)
230{
231    if (m_isServer)
232    {
233        // Resolve the address
234        boost::asio::ip::tcp::resolver resolver(m_ioService);
235        boost::asio::ip::tcp::resolver::query query(m_address, m_port);
236        boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
237
238        // Bind to the address
239        m_acceptor.open(endpoint.protocol());
240        m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
241        m_acceptor.bind(endpoint);
242    }
243
244    // Create a new connection
245    createConnection();
246
247    // Ready to go, so don't bother returning a condition variable
248    return NULL;
249}
250
251//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
252void
253TransmissionControlProtocolService::start()
254{
255    if (m_isServer)
256    {
257        // Start listening
258        boost::system::error_code ec;
259        m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
260
261        if (ec)
262        {
263            std::cout << "Error: " << ec << std::endl;
264        }
265
266        // Asyncronously accept a new connection
267        asyncAccept();
268    }
269   
270    startThreads();
271}
272
273//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
274Threading::I_Condition*
275TransmissionControlProtocolService::prepareToStop()
276{
277    delete m_pWork;
278
279    Threading::CriticalSection lock(m_pConnectionsGuard);
280
281    for(Threads_type::iterator iter = m_threads.begin(); iter != m_threads.end(); iter++)
282    {
283        (*iter)->stop();
284    }
285
286    m_ioService.stop();
287
288    return NULL;
289}
290
291//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
292void
293TransmissionControlProtocolService::stop()
294{
295    // Join all of the threads
296    for(Threads_type::iterator iter = m_threads.begin(); iter != m_threads.end(); iter++)
297    {
298        (*iter)->join();
299        Threading::ThreadFactory::destroy(*iter);
300    }
301
302    // Don't lock until we're done joining, otherwise we may get
303    // a deadlock.
304    Threading::CriticalSection lock(m_pConnectionsGuard);
305
306    m_threads.clear();
307}
308
309//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
310inline
311void
312TransmissionControlProtocolService::startThreads()
313{
314    // Check to make sure the threads have not been started
315    if (m_threadsStarted)
316    {
317        return;
318    }
319
320    // Not started, but need to double-check while inside of
321    // a critical section
322    Threading::CriticalSection lock(m_pConnectionsGuard);
323
324    // Still not started?
325    if (m_threadsStarted)
326    {
327        return;
328    }
329
330    // Start the threads
331
332    class Runnable
333    :   public Threading::I_Runnable
334    {
335    public:
336        virtual void run() throw()
337        {
338            while(!m_stopping)
339            {
340                try
341                {
342                    boost::system::error_code ec;
343                    m_ioService.run(ec);
344                   
345                    if( ec )
346                    {
347                        std::cout << "ASIO Error: " << ec << std::endl;
348                    }
349                }
350                catch(std::exception& _ex)
351                {
352                    std::cout << "Exception in ASIO loop: " << _ex.what() << std::endl;
353                }
354                catch(...)
355                {
356                    std::cout << "Unknown exception in ASIO loop" << std::endl;
357                }
358            }
359        }
360
361        virtual void stop()
362        {
363            m_stopping = true;
364        }
365
366        Runnable(boost::asio::io_service& _ioService) 
367            : m_ioService(_ioService), m_stopping(false) {}
368       
369        boost::asio::io_service& m_ioService;
370        volatile bool m_stopping;
371    };
372
373    m_pWork = new boost::asio::io_service::work(m_ioService);
374
375    // Reserve the correct amount of space.
376    m_threads.reserve(m_threadCount);
377
378    // Start some threads that will execute m_ioService.run()
379    for(int x = 0; x < m_threadCount; x++)
380    {
381        Zen::Threading::I_Thread* pThread = Zen::Threading::ThreadFactory::create(new Runnable(m_ioService));
382        m_threads.push_back(pThread);
383        pThread->start();
384    }
385
386    m_threadsStarted = true;
387
388}
389
390//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
391void
392TransmissionControlProtocolService::handleAccept(const boost::system::error_code& _error)
393{
394    if (!_error)
395    {
396        // The new connection is now connected, so start it.
397        pEndpoint_type pEndpoint(new Endpoint(getSelfReference(), m_endpoint), boost::bind(&TransmissionControlProtocolService::destroyEndpoint, this, _1));
398
399        // This endpoint is not local since it was established from an accept.
400        pEndpoint->setIsLocal(false);
401
402        m_pNewConnection->start(pEndpoint);
403
404        // Now, create another connection and do an async accept on it.
405        createConnection();
406
407        // And asynchronously accept the new connection.
408        asyncAccept();
409    }
410}
411
412//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
413void
414TransmissionControlProtocolService::createConnection()
415{
416    boost::shared_ptr<TCP::Connection> pConnection(new TCP::Connection(m_ioService, *this));
417    m_pNewConnection.swap(pConnection);
418}
419
420//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
421void
422TransmissionControlProtocolService::asyncAccept()
423{
424    // TODO Add endpoint here?
425
426    // Start the async accept using handleAccept() as the callback
427    m_acceptor.async_accept(m_pNewConnection->getSocketReference(),
428        m_endpoint,
429        boost::bind(&TransmissionControlProtocolService::handleAccept, this,
430        boost::asio::placeholders::error)
431    );
432}
433
434//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
435void
436TransmissionControlProtocolService::onConnected(pConnection_type _pConnection)
437{
438    Threading::CriticalSection lock(m_pConnectionsGuard);
439
440    typedef Memory::managed_ptr<Endpoint>   pConcreteEndpoint_type;
441
442    m_connectionMap[_pConnection->getPeer().as<pConcreteEndpoint_type>()->getEndpoint()] = _pConnection;
443}
444
445//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
446void
447TransmissionControlProtocolService::onDisconnected(pConnection_type _pConnection)
448{
449    Threading::CriticalSection lock(m_pConnectionsGuard);
450
451    typedef Memory::managed_ptr<Endpoint>   pConcreteEndpoint_type;
452
453    m_connectionMap.erase(_pConnection->getPeer().as<pConcreteEndpoint_type>()->getEndpoint());
454
455    // Dispatch this event.
456    getDisconnectedEvent().fireEvent(_pConnection->getPeer());
457}
458
459//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
460void
461TransmissionControlProtocolService::onHandleMessage(pConnection_type  _pConnection, TCP::MessageBuffer& _message)
462{
463#if 1
464    boost::iostreams::stream<boost::iostreams::array> 
465        stream(_message.getBody(), _message.getBodyLength());
466
467    boost::archive::polymorphic_binary_iarchive archive(stream, 
468                                                        boost::archive::no_header |
469                                                        boost::archive::no_tracking);
470
471
472   
473    // Deserialize the header
474    I_Message::pMessageHeader_type pHeader = getApplicationServer().getMessageRegistry()->getMessageHeader(archive);
475
476    // Construct the appropriate message
477    // The way we're doing it now, we don't know which one of these to call
478    // create() or createResponse().
479    // If we put that detail in the header somehow, then we can either call
480    // the correct method... *or* we can let the create() method figure it out.
481    pMessage_type pMessage = pHeader->getMessageType()->getMessageFactory()
482        ->create(
483            pHeader, 
484            _pConnection->getPeer(), 
485            pEndpoint_type()
486        );
487
488    // Deserialize the message
489    pMessage->serialize(pHeader, archive, 0);
490
491    // Send the message to the application server
492    getApplicationServer().handleMessage(pMessage);
493#else
494    std::stringbuf buffer(std::ios_base::in|std::ios_base::binary);
495
496    buffer.pubsetbuf(_message.getBody(), _message.getBodyLength());
497
498    std::stringstream stream;
499    buffer.pubseekpos(0);
500    stream.str(buffer.str());
501
502    boost::archive::polymorphic_binary_iarchive archive(stream, 
503                                                        boost::archive::no_header |
504                                                        boost::archive::no_tracking);
505
506
507   
508    // Deserialize the header
509    I_Message::pMessageHeader_type pHeader = getApplicationServer().getMessageRegistry()->getMessageHeader(archive);
510
511    // Construct the appropriate message
512    pMessage_type pMessage = pHeader->getMessageType()->getMessageFactory()
513        ->create(
514            pHeader, 
515            _pConnection->getPeer(), 
516            pEndpoint_type()
517        );
518
519    // Deserialize the message
520    pMessage->serialize(pHeader, archive, 0);
521
522    // Send the message to the application server
523    getApplicationServer().handleMessage(pMessage);
524#endif
525}
526
527//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
528}   // namespace AppServer
529}   // namespace Enterprise
530}   // namespace Zen
531//-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
Note: See TracBrowser for help on using the browser.